Multithreads with Task Parallel Library
2022-02-20
Related Articles
- Multithreads
- Share variables in parallel operations may lead to race condition .
- What is a deadlock and how to avoid it from happening.
- PLINQ is the parallel version of LINQ.
1. Task Parallel Library (TPL) in C#
The Task.Run
will execute on another thread without blocking the current thread. Although this thread is operating in parallel, there’re some differences between async and parallel programming.
- We schedule continuation in Async programming
- Async only use one additional thread to do the work, so if you move a computational intensive problem into async programming, that thread will still take a long time, so it’s not the most efficient way to solve the problem.
Note
- You can write multiple async functions, but you are going to write more code than leveraging the Parallel methods. e.g.,
Parallel.For, Parallel.ForEach, Parallel.Invoke
- Parallel will ensure that the work is distributed efficiently on different systems that run the application,
- however, none of the operations will be guaranteed to run in parallel, it depends on the system it runs
- and the actual order the actions are executed are not guaranteed and should not matter
- PLINQ is Parallel LINQ, provided by the TPL to write LINQ.
- The parallel methods are using Task internally, Task is an abstraction on top of threading
2. Tips and Tricks
- The Parallel Methods will block the calling thread
- we can use async and parallel together by wrapping the parallel method inside an async call (and a Task.Run function)
- This will reduce the number of available threads for parallel processing (async takes an extra one)
- If we run the Dispatcher inside the Parallel methods, it will cause a deadlock because Dispatcher runs on the UI thread
- Cancellation requests need to be handled in
Parallel
just like with theTask
-CancellationToken
- MaxDegreeOfParallelism allows us to change the max number of concurrent tasks, by default calling the Parallel methods will consume as much computer power as possible
- Whether change it depends on your application, usually we keep it as default because it can compute faster
- Manually changing it might not make the best use of the available resources
3. Error Handling
- Parallel methods will validate the parallel operations for you, if there’s an exception in any of the parallel oprations, it will be caught and rethrown as an aggregate exception.
- The validation happens once all the parallel oprations are completed
- Therefore, even if one parallel operation fails, the other scheduled operations will still start, they won’t be affected by the failure.
4. Parallel.For & Parallel.ForEach
- Each element will be executed in parallel.
- Returns ParallelLoopResult which tells you if the execution has completed successfully.
- Completed means there’s no cancellation token, the loop ran to completion, there’s no break or stop
- LowestbreakIteration is only relevant if you’re using
Parallel.For
, it represents the lowest index of where a break was called
- Break Loops
- Break won’t automatically stop running operations
- you cannot use the
break
keyword like in normal loops - you must gracefully handle a requested break, meaning the consecutive operations won’t start executing once you requested a break, it’s just another way of saying that we don’t want to start the rest of the parallel operations that are waiting to be processed
- we can add a state param in the parallel operation and set it to break or stop, then check the state.
4. Cancel Parellel Operations
We can use the cancellation token like the async programming because the underlining implementation of parallel also uses the Task class.
void Main()
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var cancellationTokenSource = new CancellationTokenSource();
cancellationTokenSource.CancelAfter(2000);
var parallelOptions = new ParallelOptions {
CancellationToken = cancellationTokenSource.Token,
MaxDegreeOfParallelism = 1
};
int total = 0;
try
{
Parallel.For(0, 100, parallelOptions, (i) =>
{
var result = Compute(i);
Interlocked.Add(ref total, result);
});
}
catch (OperationCanceledException ex) {
Console.WriteLine("Cancellation Requested!");
}
Console.WriteLine(total);
Console.WriteLine($"It took: {stopwatch.ElapsedMilliseconds}ms to run");
}
static Random random = new Random();
static int Compute(int value)
{
var randomMilliseconds = random.Next(10, 50);
var end = DateTime.Now + TimeSpan.FromMilliseconds(randomMilliseconds);
while (DateTime.Now < end) { }
return value + 100;
}
Once a cancellation is detected, no further iterations/ operations will start, but the already started executions will not be cancelled.
If we want to cancel the started operations, we need to monitor the cancellation token by checking if Token.IsCancellationRequested
is true before start our executions.
- Note: we usually monitor cancellation token in between of expensive operations.
Parallel.For(0, 100, parallelOptions, (i) =>
{
if (parallelOptions.CancellationToken.IsCancellationRequested) {
// roll back or other operations
Console.WriteLine("Cancellation Detected!");
}
else
{
var result = Compute(i);
Interlocked.Add(ref total, result);
}
});
5. ThreadLocal and AsyncLocal Variables
Objective: create local variable for thread.
Option 1: ThreadLocal (X)
- It is not safe to use because the Task is an abstraction of thread and might reuse thread.
using System.Diagnostics;
class Program
{
static ThreadLocal<int?> threadLocal = new ThreadLocal<int?>();
static void Main()
{
var stopwatch = new Stopwatch();
stopwatch.Start();
int total = 0;
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
Parallel.For(0, 100, options, (i) =>
{
// the threadLocal variable might not be null if a threaad has been reused
var currentValue = threadLocal.Value;
threadLocal.Value = Compute(i);
});
Console.WriteLine(total);
Console.WriteLine($"It took: {stopwatch.ElapsedMilliseconds}ms to run");
}
static Random random = new Random();
static int Compute(int value)
{
var randomMilliseconds = random.Next(10, 50);
var end = DateTime.Now + TimeSpan.FromMilliseconds(randomMilliseconds);
while (DateTime.Now < end) { }
return value + 100;
}
}
Option 2: AsyncLocal class
The asyncLocal will be running in the thread context and safe to use at a thread level.
void Main()
{
var stopwatch = new Stopwatch();
stopwatch.Start();
asyncLocal.Value = 200;
Parallel.For(0, 100, (i) =>
{
var currentValue = asyncLocal.Value;
asyncLocal.Value = Compute(i);
});
var currentValue = asyncLocal.Value; // still the same as original value
Console.WriteLine(asyncLocal);
Console.WriteLine($"It took: {stopwatch.ElapsedMilliseconds}ms to run");
}
static AsyncLocal<int?> asyncLocal = new AsyncLocal<int?>();
static Random random = new Random();
static int Compute(int value)
{
var randomMilliseconds = random.Next(10, 50);
var end = DateTime.Now + TimeSpan.FromMilliseconds(randomMilliseconds);
while (DateTime.Now < end) { }
return value + 100;
}