dotnet - Task Patterns
Published: Dotnet Estimated reading time: ~3 minutes
Tasks
Get The Number of Logical Cores
var logicalCoreCount = Environment.ProcessorCount;
Wait All One By One
List < Task <TResult>> tasks = new List < Task<TResult>> ();
for (int i = 0; i < N; i++) // Start N tasks:
{
tasks.Add( Task.Factory.StartNew<TResult> (code) );
}
while (tasks.Count > 0)
{
// Get the array index of the task that finishes first
int index = Task.WaitAny( tasks.ToArray() ); // This will never throw an exception here
// Catch the exception like this
try {
tasks[index].Wait();
// Process tasks[index].Result:
}
catch (AggregateException ae) {
ae = ae.Flatten();
foreach (Exception ex in ae.InnerExceptions){
Console.WriteLine(ex.Message);
}
}
// Or like this
try {
var r = tasks[index].Result;
}
catch (AggregateException ae) {
ae = ae.Flatten();
foreach (Exception ex in ae.InnerExceptions){
Console.WriteLine(ex.Message);
}
}
// Or this...
if (tasks[index].Exception != null) {
// An exception has occurred!
}
// Remove the task from the list
tasks.RemoveAt(index);
}
ALTERNATE FORM
List<Task<TResult>> tasks =
(from arg in args
select MyMethodAsync(arg)
).ToList();
while (tasks.Count > 0)
{
try
{
Task<TResult> task = await Task.WhenAny(tasks);
tasks.Remove(task);
var result = await task;
AnotherMethod(result);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}
##Catching errors
Task.WaitAll
Result
Task.Exception
catch (AggregateException ae)
{
ae = ae.Flatten();
foreach (Exception ex in ae.InnerExceptions){
Console.WriteLine(ex.Message);
}
}
Subscribing to the TaskScheduler.UnobservedTaskException
TaskScheduler.UnobservedTaskException +=
new EventHandler<UnobservedTaskExceptionEventArgs>(
TaskUnobservedException_Handler);
static void TaskUnobservedException_Handler(
object sender,
UnobservedTaskExceptionEventArgs e)
{
Console.WriteLine("Unobserved: ", e.Exception.Message);
e.SetObserved();
}
Wait All One By One Revised (working version)
// Let's say we have a file queue we want to process asynchonously.
Queue<string> fileQueue = GetMyFiles();
// Get The Number of Logical Cores
// We should not spawn any more tasks than we have cores.
var maxTaskCount = Environment.ProcessorCount;
List < Task <TResult>> tasks = new List < Task<TResult>> ();
// Start up a few tasks.
for (int i = 0; i <= maxTaskCount; i++)
{
if (fileList.Count > 0) {
tasks.Add( Task.Factory.StartNew<TResult> (MyMethodAsync(fileQueue.Dequeue()) );
}
else {
break;
}
}
while (tasks.Count > 0 || fileQueue.Count > 0)
{
// Get the array index of the task that finishes first
int index = Task.WaitAny( tasks.ToArray() ); // This will never throw an exception here
// Catch the exception like this
try {
tasks[index].Wait();
// Process tasks[index].Result:
}
catch (AggregateException ae) {
ae = ae.Flatten();
foreach (Exception ex in ae.InnerExceptions){
Console.WriteLine(ex.Message);
}
}
// Or like this
try {
var r = tasks[index].Result;
}
catch (AggregateException ae) {
ae = ae.Flatten();
foreach (Exception ex in ae.InnerExceptions){
Console.WriteLine(ex.Message);
}
}
// Or this...
if (tasks[index].Exception != null) {
// An exception has occurred!
}
// Remove the task from the list
tasks.RemoveAt(index);
// Add a new task to the list, if we still have files to process
// This keeps the threads busy
if (fileQueue.Count > 0) {
tasks.Add( Task.Factory.StartNew<TResult> (MyMethodAsync(fileQueue.Dequeue()) );
}
}
ALTERNATE FORM
List<Task<TResult>> tasks =
(from arg in args
select MyMethodAsync(arg)
).ToList();
while (tasks.Count > 0)
{
try
{
Task<TResult> task = await Task.WhenAny(tasks);
tasks.Remove(task);
var result = await task;
AnotherMethod(result);
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
}