Blocking the process of collecting n elements at the same time - continue as soon as 1 is executed

I have the following scenario.

  • I am taking 50 jobs from the database into a blocking collection.

  • Each task is long. (could potentially be). So I want to run them on a separate thread. (I know - it might be better to run them as Task.WhenAll and let the TPL figure it out, but I want to control how many are executed at the same time)

  • Let's say I want to run 5 of them at the same time (customizable)

  • I create 5 tasks (TPL), one for each task and run them in parallel.

I want to do the next job in the blocking block as soon as one of the jobs from step 4 is completed and will continue until all 50 are completed.

I am thinking of creating a Static blockingCollection and a TaskCompletion source that will be called when the job is finished and then it can call the consumer again to fetch one job at a time from the queue. I would also like to call async / await for each job, but also not sure if this will affect the approach.

Is this the correct way to accomplish what I am trying to do?

Similar to this link, but catch that I want to process the next job as soon as one of the first N items is completed. Not after all N have been executed.

Update:

Ok, I have this piece of code that does exactly what I want in case someone wants to use it later. As you can see below, 5 threads are created and each thread starts the next job when it is executed with the current one. Only 5 threads are active at any given time. I understand that this may not work 100% as it always does, and will have context switching performance issues if used with a single cpu / core.

var block = new ActionBlock<Job>(
                job => Handler.HandleJob(job), 
                    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

              foreach (Job j in GetJobs())
                  block.SendAsync(j);

      

Job 2 started in thread: 13.timeout: 3600000ms. Time: 8/29/2014 3:14:43 PM

Job 4 started in thread: 14. timeout: 15000ms. Time: 8/29/2014 3:14:43 PM

Job 0 started in thread: 7. timeout: 600000ms. Time: 8/29/2014 3:14:43 PM

Job 1 started in thread: 12. timeout: 900000ms. Time: 8/29/2014 3:14:43 PM

Work 3 started in thread: 11. wait time: 120000ms. Time: 8/29/2014 3:14:43 PM

job 4 completed on thread: 14. 8/29/2014 3:14:58 PM

Job 5 started in thread: 14. timeout: 1800000ms. Time: 8/29/2014 3:14:58 PM

job 3 completed on thread: 11. 8/29/2014 3:16:43 PM

Job 6 started in thread: 11. timeout: 1200000ms. Time: 8/29/2014 3:16:43 PM

job 0 completed on thread: 7.8/29/2014 3:24:43 PM

Job 7 started in thread: 7. timeout: 30000 ms. Time: 08/28/2013 3:24:43 PM

job 7 completed on thread: 7.8/29/2014 3:25:13 PM

Work 8 started in thread: 7. wait time: 100000ms. Time: 8/29/2014 3:25:13 PM

job 8 completed on thread: 7. 8/29/2014 3:26:53 PM

Work 9 started in thread: 7. wait time: 900000ms. Time: 8/29/2014 3:26:53 PM

job 1 completed on thread: 12.8/29/2014 3:29:43 PM

Job 10 started in thread: 12. timeout: 300000ms. Time: 8/29/2014 3:29:43 PM

job 10 completed on thread: 12.8/29/2014 3:34:43 PM

Job 11 started in thread: 12. timeout: 600000ms. Time: 8/29/2014 3:34:43 PM

job 6 completed on thread: 11. 8/29/2014 3:36:43 PM

Job 12 started in thread: 11. timeout: 300000ms. Time: 8/29/2014 3:36:43 PM

job 12 completed on thread: 11. 8/29/2014 3:41:43 PM

Task 13 started in thread: 11. timeout: 100000ms. Time: 8/29/2014 3:41:43 PM

job 9 completed on thread: 7. 8/29/2014 3:41:53 PM

Job 14 started in thread: 7. timeout: 300000ms. Time: 8/29/2014 3:41:53 PM

job 13 completed per stream: 11. 8/29/2014 3:43:23 PM

job 11 completed on thread: 12.8/29/2014 3:44:43 PM

job 5 completed on thread: 14. 8/29/2014 3:44:58 PM

job 14 completed on thread: 7. 8/29/2014 3:46:53 PM

job 2 completed on thread: 13.8/29/2014 4:14:43 PM

+3


source to share


3 answers


You can easily achieve what you need using TPL Dataflow

.

What you can do is use BufferBlock<T>

which is a buffer to store your data and bundles it together with ActionBlock<T>

which will consume these requests as they come from BufferBlock<T>

.

Now the beauty is that you can specify how many requests you want to ActionBlock<T>

process at the same time using the class ExecutionDataflowBlockOptions

.

Here's a simplified version of the console that processes a bunch of numbers as they come in, prints out their name, and Thread.ManagedThreadID

:



private static void Main(string[] args)
{
    var bufferBlock = new BufferBlock<int>();

    var actionBlock =
        new ActionBlock<int>(i => Console.WriteLine("Reading number {0} in thread {1}",
                                  i, Thread.CurrentThread.ManagedThreadId),
                             new ExecutionDataflowBlockOptions 
                                 {MaxDegreeOfParallelism = 5});

    bufferBlock.LinkTo(actionBlock);
    Produce(bufferBlock);

    Console.ReadKey();
}

private static void Produce(BufferBlock<int> bufferBlock)
{
    foreach (var num in Enumerable.Range(0, 500))
    {
        bufferBlock.Post(num);
    }
}

      

You can also post them asynchronously if needed using the expected BufferBlock.SendAsync

This way you allow TPL

all the throttling to be handled for you without manually requiring it.

+4


source


You can use BlockingCollection

and it will work fine, but it was built before async-await

, so it blocks synchronously, which might be less scalable in most cases.

You'd better use async

ready TPL Dataflow

, as suggested by Yuval Itschakov. All you need is ActionBlock

one that processes each element at the same time with MaxDegreeOfParallelism

out 5, and you send your work to it synchronously ( block.Post(item)

) or asynchronously ( await block.SendAsync(item)

):



private static void Main()
{
    var block = new ActionBlock<Job>(
        async job => await job.ProcessAsync(),
        new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});

    for (var i = 0; i < 50; i++)
    {
        block.Post(new Job());
    }

    Console.ReadKey();
}

      

+3


source


You can do it using SemaphoreSlim

like in this answer or using ForEachAsync

like in this answer .

0


source







All Articles