How to use a parallel task library (TPL) with load balancing and limited degree of parallelism?

My task is to write a known nr values ​​to the external system using an interface (async). I have to limit the maximum number of concurrent writes that run at the same time. Also, I need to use load balancing as this external system can be written for some values.

I know how to solve these problems each one:

Parallelism degree:

new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites}

      

I also came across this article: http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx

Load balancing:

var partitioner = Partitioner.Create(values.ToList(), true);

      

Task from the asynchronous interface:

var writeTask = Task<AccessResult>.Factory.FromAsync(BeginWriteValue, EndWriteValue, value.SystemId, value.Xml, priority, null);

      



But what is the right way to combine all these methods? I created the following code:

  int maxNrParallelWrites = GetMaxNrParallelWrites();
  var partitioner = Partitioner.Create(values.ToList(), true);
  Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites},
    (val) =>
    {
      var writeValueTask = GetWriteValueTask(val, priority);
      Task.WaitAny(writeValueTask);
    });

      

I am particularly unsure about the last part of the previous code: the action that does the workload. It would be better, instead of creating a WriteValueTask, to directly use the synchronous interface like this:

(val) =>
    {
      var accessResult = externalSystem.WriteValue(....);
    }

      

Or is it okay to create a task and then wait for it (Task.WaitAny (...))?

+3


source to share


2 answers


You should be using TPL Dataflow ActionBlock

, which encapsulates whatever is for you. This is an actor-based framework that is part of the TPL:

var block = new ActionBlock<Value>(
    value => GetWriteValueTask(value, priority)
    new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = GetMaxNrParallelWrites();
    });

foreach (var value in values)
{
    block.Post(value);
}

      



You can set MaxDegreeOfParallelism

, BoundedCapacity

while load balancing is baked because it only processes the tags MaxDegreeOfParallelism

at a time, and when each one finishes it, it processes the next one (as opposed to using Partitioner

that pre-splits the collection)

Note. When you execute a task async

and wait for it to complete synchronously (i.e. Task.WaitAny

), nothing is really asynchronous. You should use Task.WhenAny

instead in such cases.

+2


source


There is a good example of how to create a load balancing method ForEachASync

in this article. ... I took out Task.Run

to avoid starting a new thread and then the extension method becomes this:

public static class Extensions
{
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
    {
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);
    }

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));
    }
}

      

Using



This example processes no more than 100 emails asynchronously at a time

 // Process 100 emails at a time
 return emailsToProcess.ForEachAsync(100, ProcessSingleEmail);

      

+1


source







All Articles