How to use a parallel task library (TPL) with load balancing and limited degree of parallelism?
My task is to write a known nr values to the external system using an interface (async). I have to limit the maximum number of concurrent writes that run at the same time. Also, I need to use load balancing as this external system can be written for some values.
I know how to solve these problems each one:
Parallelism degree:
new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites}
I also came across this article: http://msdn.microsoft.com/en-us/library/ee789351(v=vs.110).aspx
Load balancing:
var partitioner = Partitioner.Create(values.ToList(), true);
Task from the asynchronous interface:
var writeTask = Task<AccessResult>.Factory.FromAsync(BeginWriteValue, EndWriteValue, value.SystemId, value.Xml, priority, null);
But what is the right way to combine all these methods? I created the following code:
int maxNrParallelWrites = GetMaxNrParallelWrites();
var partitioner = Partitioner.Create(values.ToList(), true);
Parallel.ForEach(partitioner, new ParallelOptions {MaxDegreeOfParallelism = maxNrParallelWrites},
(val) =>
{
var writeValueTask = GetWriteValueTask(val, priority);
Task.WaitAny(writeValueTask);
});
I am particularly unsure about the last part of the previous code: the action that does the workload. It would be better, instead of creating a WriteValueTask, to directly use the synchronous interface like this:
(val) =>
{
var accessResult = externalSystem.WriteValue(....);
}
Or is it okay to create a task and then wait for it (Task.WaitAny (...))?
source to share
You should be using TPL Dataflow ActionBlock
, which encapsulates whatever is for you. This is an actor-based framework that is part of the TPL:
var block = new ActionBlock<Value>(
value => GetWriteValueTask(value, priority)
new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = GetMaxNrParallelWrites();
});
foreach (var value in values)
{
block.Post(value);
}
You can set MaxDegreeOfParallelism
, BoundedCapacity
while load balancing is baked because it only processes the tags MaxDegreeOfParallelism
at a time, and when each one finishes it, it processes the next one (as opposed to using Partitioner
that pre-splits the collection)
Note. When you execute a task async
and wait for it to complete synchronously (i.e. Task.WaitAny
), nothing is really asynchronous. You should use Task.WhenAny
instead in such cases.
source to share
There is a good example of how to create a load balancing method ForEachASync
in this article. ... I took out Task.Run
to avoid starting a new thread and then the extension method becomes this:
public static class Extensions
{
public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
{
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select ExecuteInPartition(partition, body));
}
}
Using
This example processes no more than 100 emails asynchronously at a time
// Process 100 emails at a time
return emailsToProcess.ForEachAsync(100, ProcessSingleEmail);
source to share