Blocking the process of collecting n elements at the same time - continue as soon as 1 is executed
I have the following scenario.
-
I am taking 50 jobs from the database into a blocking collection.
-
Each task is long. (could potentially be). So I want to run them on a separate thread. (I know - it might be better to run them as Task.WhenAll and let the TPL figure it out, but I want to control how many are executed at the same time)
-
Let's say I want to run 5 of them at the same time (customizable)
-
I create 5 tasks (TPL), one for each task and run them in parallel.
I want to do the next job in the blocking block as soon as one of the jobs from step 4 is completed and will continue until all 50 are completed.
I am thinking of creating a Static blockingCollection and a TaskCompletion source that will be called when the job is finished and then it can call the consumer again to fetch one job at a time from the queue. I would also like to call async / await for each job, but also not sure if this will affect the approach.
Is this the correct way to accomplish what I am trying to do?
Similar to this link, but catch that I want to process the next job as soon as one of the first N items is completed. Not after all N have been executed.
Update:
Ok, I have this piece of code that does exactly what I want in case someone wants to use it later. As you can see below, 5 threads are created and each thread starts the next job when it is executed with the current one. Only 5 threads are active at any given time. I understand that this may not work 100% as it always does, and will have context switching performance issues if used with a single cpu / core.
var block = new ActionBlock<Job>(
job => Handler.HandleJob(job),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
foreach (Job j in GetJobs())
block.SendAsync(j);
Job 2 started in thread: 13.timeout: 3600000ms. Time: 8/29/2014 3:14:43 PM
Job 4 started in thread: 14. timeout: 15000ms. Time: 8/29/2014 3:14:43 PM
Job 0 started in thread: 7. timeout: 600000ms. Time: 8/29/2014 3:14:43 PM
Job 1 started in thread: 12. timeout: 900000ms. Time: 8/29/2014 3:14:43 PM
Work 3 started in thread: 11. wait time: 120000ms. Time: 8/29/2014 3:14:43 PM
job 4 completed on thread: 14. 8/29/2014 3:14:58 PM
Job 5 started in thread: 14. timeout: 1800000ms. Time: 8/29/2014 3:14:58 PM
job 3 completed on thread: 11. 8/29/2014 3:16:43 PM
Job 6 started in thread: 11. timeout: 1200000ms. Time: 8/29/2014 3:16:43 PM
job 0 completed on thread: 7.8/29/2014 3:24:43 PM
Job 7 started in thread: 7. timeout: 30000 ms. Time: 08/28/2013 3:24:43 PM
job 7 completed on thread: 7.8/29/2014 3:25:13 PM
Work 8 started in thread: 7. wait time: 100000ms. Time: 8/29/2014 3:25:13 PM
job 8 completed on thread: 7. 8/29/2014 3:26:53 PM
Work 9 started in thread: 7. wait time: 900000ms. Time: 8/29/2014 3:26:53 PM
job 1 completed on thread: 12.8/29/2014 3:29:43 PM
Job 10 started in thread: 12. timeout: 300000ms. Time: 8/29/2014 3:29:43 PM
job 10 completed on thread: 12.8/29/2014 3:34:43 PM
Job 11 started in thread: 12. timeout: 600000ms. Time: 8/29/2014 3:34:43 PM
job 6 completed on thread: 11. 8/29/2014 3:36:43 PM
Job 12 started in thread: 11. timeout: 300000ms. Time: 8/29/2014 3:36:43 PM
job 12 completed on thread: 11. 8/29/2014 3:41:43 PM
Task 13 started in thread: 11. timeout: 100000ms. Time: 8/29/2014 3:41:43 PM
job 9 completed on thread: 7. 8/29/2014 3:41:53 PM
Job 14 started in thread: 7. timeout: 300000ms. Time: 8/29/2014 3:41:53 PM
job 13 completed per stream: 11. 8/29/2014 3:43:23 PM
job 11 completed on thread: 12.8/29/2014 3:44:43 PM
job 5 completed on thread: 14. 8/29/2014 3:44:58 PM
job 14 completed on thread: 7. 8/29/2014 3:46:53 PM
job 2 completed on thread: 13.8/29/2014 4:14:43 PM
source to share
You can easily achieve what you need using TPL Dataflow
.
What you can do is use BufferBlock<T>
which is a buffer to store your data and bundles it together with ActionBlock<T>
which will consume these requests as they come from BufferBlock<T>
.
Now the beauty is that you can specify how many requests you want to ActionBlock<T>
process at the same time using the class ExecutionDataflowBlockOptions
.
Here's a simplified version of the console that processes a bunch of numbers as they come in, prints out their name, and Thread.ManagedThreadID
:
private static void Main(string[] args)
{
var bufferBlock = new BufferBlock<int>();
var actionBlock =
new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
i, Thread.CurrentThread.ManagedThreadId),
new ExecutionDataflowBlockOptions
{MaxDegreeOfParallelism = 5});
bufferBlock.LinkTo(actionBlock);
Produce(bufferBlock);
Console.ReadKey();
}
private static void Produce(BufferBlock<int> bufferBlock)
{
foreach (var num in Enumerable.Range(0, 500))
{
bufferBlock.Post(num);
}
}
You can also post them asynchronously if needed using the expected BufferBlock.SendAsync
This way you allow TPL
all the throttling to be handled for you without manually requiring it.
source to share
You can use BlockingCollection
and it will work fine, but it was built before async-await
, so it blocks synchronously, which might be less scalable in most cases.
You'd better use async
ready TPL Dataflow
, as suggested by Yuval Itschakov. All you need is ActionBlock
one that processes each element at the same time with MaxDegreeOfParallelism
out 5, and you send your work to it synchronously ( block.Post(item)
) or asynchronously ( await block.SendAsync(item)
):
private static void Main()
{
var block = new ActionBlock<Job>(
async job => await job.ProcessAsync(),
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});
for (var i = 0; i < 50; i++)
{
block.Post(new Job());
}
Console.ReadKey();
}
source to share
You can do it using SemaphoreSlim
like in this answer or using ForEachAsync
like in this answer .
source to share