How to use a parallel task library (TPL) with load balancing and limited degree of parallelism?

My task is to write a known nr values ​​to the external system using an interface (async). I have to limit the maximum number of concurrent writes that run at the same time. Also, I need to use load balancing as this external system can be written for some values.

I know how to solve these problems each one:

Parallelism degree:

new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites}


I also came across this article:

Load balancing:

var partitioner = Partitioner.Create(values.ToList(), true);


Task from the asynchronous interface:

var writeTask = Task<AccessResult>.Factory.FromAsync(BeginWriteValue, EndWriteValue, value.SystemId, value.Xml, priority, null);


But what is the right way to combine all these methods? I created the following code:

  int maxNrParallelWrites = GetMaxNrParallelWrites();
  var partitioner = Partitioner.Create(values.ToList(), true);
  Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites},
    (val) =>
      var writeValueTask = GetWriteValueTask(val, priority);


I am particularly unsure about the last part of the previous code: the action that does the workload. It would be better, instead of creating a WriteValueTask, to directly use the synchronous interface like this:

(val) =>
      var accessResult = externalSystem.WriteValue(....);


Or is it okay to create a task and then wait for it (Task.WaitAny (...))?


source to share

2 answers

You should be using TPL Dataflow ActionBlock

, which encapsulates whatever is for you. This is an actor-based framework that is part of the TPL:

var block = new ActionBlock<Value>(
    value => GetWriteValueTask(value, priority)
    new ExecutionDataflowBlockOptions()
        MaxDegreeOfParallelism = GetMaxNrParallelWrites();

foreach (var value in values)


You can set MaxDegreeOfParallelism

, BoundedCapacity

while load balancing is baked because it only processes the tags MaxDegreeOfParallelism

at a time, and when each one finishes it, it processes the next one (as opposed to using Partitioner

that pre-splits the collection)

Note. When you execute a task async

and wait for it to complete synchronously (i.e. Task.WaitAny

), nothing is really asynchronous. You should use Task.WhenAny

instead in such cases.



There is a good example of how to create a load balancing method ForEachASync

in this article. ... I took out Task.Run

to avoid starting a new thread and then the extension method becomes this:

public static class Extensions
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));



This example processes no more than 100 emails asynchronously at a time

 // Process 100 emails at a time
 return emailsToProcess.ForEachAsync(100, ProcessSingleEmail);




All Articles