Implementing Parallel and Async Programming in C#

Implementing Parallel and Async Programming in C#

Recently, we encountered a scenario where we needed to process multiple request items simultaneously and in parallel. One of the simplest ways to achieve this is using the following pattern:

1Task.WhenAll(source.Select(async item => await DoWorkAsync(item)));

This approach allows us to spawn multiple tasks, each executing an asynchronous method in parallel.

While this works well when the number of request items is small and the async operation is lightweight, it becomes problematic with a large number of requests—especially when each operation is resource-intensive.

In such cases, running too many tasks at once can overwhelm system resources, potentially leading to errors and causing the application to fail.

To address this, we need a way to run multiple asynchronous operations concurrently, but in a controlled and safe manner—where we can limit the number of items being processed in parallel to avoid resource exhaustion.

This is where Partitioning comes into play. In this blog, we’ll explore how to use partitioning to manage and process large sets of asynchronous tasks efficiently and safely.

Converting a Collection into Partitions

Let’s say we have a collection of request items. Our system can handle a specific number of these items—say n—concurrently without running into errors. In this context, n represents the degree of parallelism.

To manage this, we want to split the collection into partitions—each containing n items—so that we can process them in parallel, while respecting the limit on concurrency.

We can achieve this by using the Partitioner<TSource> class from the System.Collections.Concurrent namespace. Here's how it's done:

1Partitioner
2.Create(source)
3.GetPartitions(maxDoP);
  • source is your input collection of type IEnumerable<TSource>.
  • maxDoP (maximum degree of parallelism) defines how many parallel partitions will be created.

Running Partitions in Parallel

After creating the partitions, we want them to run in parallel. This can be done by calling the AsParallel() method on the partitions:

1Partitioner
2 .Create(source)
3 .GetPartitions(maxDoP)
4 .AsParallel();

This converts the IEnumerable<TSource> into a ParallelQuery<TSource>, allowing us to use LINQ methods to process the partitioned items in parallel.

Creating a function to process each partition

We need a function that does the following:

  • Can be called for each partition
  • Disposes of the partition after use
  • Iterates over the partition items and invokes a user-defined asynchronous method for each item

This can be achieved with the following code:

1async Task AwaitPartition(IEnumerator<T> partition)
2{
3 using (partition)
4 {
5 while (true)
6 {
7 bool moveNext = partition.MoveNext();
8 T current = moveNext ? partition.Current : default;
9
10 if (!moveNext) break;
11
12 await Task.Yield(); // Allow fair scheduling
13
14 await funcBody(current); // Call user-defined async function
15 }
16 }
17}

Explanation:

  • The partition is passed as a parameter to the function.
  • By using the using statement, we ensure that the partition (an enumerator) is properly disposed of after use.
  • We iterate through the partition using MoveNext(). If it returns false, it means there are no more items to process, so we exit the loop.
  • For each item, we call the user-defined async method funcBody(current).
  • Additionally, we use Task.Yield(), which yields control back to the task scheduler, allowing other parallel tasks to run. This promotes fair CPU usage and prevents a single task from blocking others. If no other tasks are ready, the current task resumes execution immediately.

Handling Exceptions: Using AggregateException

When tasks run in parallel, exceptions can occur in one or more of them. Rather than stopping execution immediately when a single task fails, it's often better to collect individual exceptions and throw a single AggregateException after all tasks have completed.

To achieve this, we can use a thread-safe ConcurrentBag<Exception> to store any exceptions that occur while tasks are executing in parallel. Once all tasks are finished, we can check this bag and throw an AggregateException if any errors were captured.

Here’s how this can be implemented:

1var exceptions = new ConcurrentBag<Exception>();
2
3async Task AwaitPartition(IEnumerator<T> partition)
4{
5 using (partition)
6 {
7 while (true)
8 {
9 bool moveNext;
10 T current;
11
12 try
13 {
14 moveNext = partition.MoveNext();
15 current = moveNext ? partition.Current : default;
16 }
17 catch (Exception ex)
18 {
19 exceptions.Add(ex);
20 break;
21 }
22
23 if (!moveNext) break;
24
25 await Task.Yield(); // Yield control to ensure fair task scheduling
26
27 try
28 {
29 await funcBody(current); // Call user-defined async function
30 }
31 catch (Exception ex)
32 {
33 exceptions.Add(ex);
34 }
35 }
36 }
37}

Next, we create the tasks and process the partitions in parallel:

1var tasks = Partitioner.Create(source)
2 .GetPartitions(maxDoP)
3 .AsParallel()
4 .Select(partition => AwaitPartition(partition));
5
6await Task.WhenAll(tasks);

Finally, after all tasks are complete, we check if any exceptions were recorded:

1if (!exceptions.IsEmpty)
2 throw new AggregateException(exceptions);
3

Making the method accessible

To make this method reusable and easy to access, we define it as an extension method on IEnumerable<T> within a static class. This allows you to call it directly on any collection.

1public static class ParallelExtensions
2{
3 public static async Task ParallelAsync<T>(
4 this IEnumerable<T> source,
5 Func<T, Task> funcBody,
6 int maxDoP = 4
7 )
8 {
9 var exceptions = new ConcurrentBag<Exception>();
10
11 async Task AwaitPartition(IEnumerator<T> partition)
12 {
13 using (partition)
14 {
15 while (true)
16 {
17 bool moveNext;
18 T current;
19
20 try
21 {
22 moveNext = partition.MoveNext();
23 current = moveNext ? partition.Current : default;
24 }
25 catch (Exception ex)
26 {
27 exceptions.Add(ex);
28 break;
29 }
30
31 if (!moveNext) break;
32
33 await Task.Yield();
34
35 try
36 {
37 await funcBody(current);
38 }
39 catch (Exception ex)
40 {
41 exceptions.Add(ex);
42 }
43 }
44 }
45 }
46
47 var tasks = Partitioner.Create(source)
48 .GetPartitions(maxDoP)
49 .AsParallel()
50 .Select(partition => AwaitPartition(partition));
51
52 await Task.WhenAll(tasks);
53
54 if (!exceptions.IsEmpty)
55 throw new AggregateException(exceptions);
56 }
57}

Demonstrating the ParallelAsync<T> method call

In the following example, we have a list of integers representing request items. An asynchronous function is defined to check whether each number is even or odd. This function will be invoked for every item in the list, and the processing will be performed in parallel.

1// List of request items
2var requestItems = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13 };
3
4// User-defined async function to process each item
5Func<int, Task> func = async (x) =>
6{
7 // Simulate some asynchronous work
8 await Task.Delay(1000);
9
10 if (x % 2 == 0)
11 Console.WriteLine($"{x} is an even number");
12 else
13 Console.WriteLine($"{x} is an odd number");
14};
15
16// Use the extension method to process items in parallel
17// with a maximum degree of parallelism set to 4
18await requestItems.ParallelAsync(func, maxDoP: 4);

Output preview:

Alt text

Benefits of the ParallelAsync Method We Defined Earlier

  • Scalable: Prevents overwhelming the system by not launching thousands of tasks simultaneously.
  • Resilient: Captures and aggregates all exceptions, making centralized error handling easier.
  • Reusable: Works with any IEnumerable<T> —simply pass in your async function.

Conclusion

By leveraging data partitioning, asynchronous execution, and robust error handling, this approach provides a clean and dependable way to scale your code across large datasets—without compromising readability or stability.

References:

  1. Implementing a simple ForEachAsync