Best practice for task / wait in foreach loop

I have a lot of time to use in a foreach that uses a task / await. it involves fetching data from the database, generating html, POSTing to the API, and storing responses to the DB.

The layout looks like this

List<label> labels = db.labels.ToList();
foreach (var x in list) 
{
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))

    //Render the HTML
    //do some fast stuff with objects

    List<response> res = await api.sendMessage(object);  //POST

    //put all the responses in the db
    foreach (var r in res) 
    {
        db.responses.add(r);
    }

    db.SaveChanges();
}

      

The time creating the Html and putting it in the API seems to be taking the most of the time.

Ideally, it would be great if I could generate HTML for the next element and wait for the message to finish before posting the next element.

Other ideas are also welcome. How would you do it?

I first thought of adding Task

above foreach

and waited for it to finish before doing the next POST, but then how to handle the last loop ... it feels messy ...

+3


source to share


4 answers


Here's what I ended up using: ( fooobar.com/questions/64069 / ... )

List<ToSend> sendToAPI = new List<ToSend>();
List<label> labels = db.labels.ToList();
foreach (var x in list) {
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))

    //Render the HTML
    //do some fast stuff with objects
    sendToAPI.add(the object with HTML);
}

int maxParallelPOSTs=5;
await TaskHelper.ForEachAsync(sendToAPI, maxParallelPOSTs, async i => {
    using (NasContext db2 = new NasContext()) {
        List<response> res = await api.sendMessage(i.object);  //POST

        //put all the responses in the db
        foreach (var r in res) 
        {
            db2.responses.add(r);
        }

        db2.SaveChanges();
    }
});





    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate {
                using (partition)
                    while (partition.MoveNext()) {
                        await body(partition.Current).ContinueWith(t => {
                            if (t.Exception != null) {
                                string problem = t.Exception.ToString();
                            }
                            //observe exceptions
                        });

                    }
            }));
    }

      



basically allows me to generate HTML sync, which is fine as it only takes a few seconds to generate 1000, but allows me to publish and save asynchronously to DBs with as many threads as I predefine. In this case I post to the Mandrill API, concurrent messages are not an issue.

0


source


You can do it in parallel, but you need a different context for each task.

Entity framework is not thread safe, so if you cannot use one context in parallel tasks.

var tasks = myLabels.Select( async label=>{
    using(var db = new MyDbContext ()){
        // do processing...
        var response = await api.getresponse();
        db.Responses.Add(response);
        await db.SaveChangesAsync();
    } 
});

await Tasks.WhenAll(tasks);

      



In this case, all tasks will run in parallel, and each task will have its own context.

If you don't create a new Context for each task, you will get the error mentioned in this question Does Entity Framework support concurrent asynchronous requests?

+3


source


This is more of an architecture issue than a code issue here imo.

You can split your work into two distinct parts:

  • Get data from database and generate HTML
  • Submit API request and save response to database

You can run them both in parallel and use a queue to coordinate this: whenever your HTML is ready, it is added to the queue, and another worker goes from there, taking that HTML and submitting to the API.

Both parts can be done in a multi-threaded way, for example. you can process multiple items from the queue at the same time by having a set of workers looking for items to process in the queue.

+2


source


This cries out for the producer / consumer model: one producer is producing data at a rate different from what the consumer is consuming it. After the manufacturer no longer has a product, it notifies the consumer that data is no longer expected.

MSDN has a good example of this pattern in which multiple blocks of data are chained together: the output of one block is the input of another block.

Walkthrough: Creating a Data Flow Protocol

The idea is this:

  • Create a class that will generate HTML.
  • This class has an object of class System.Threading.Tasks.Dataflow.BufferBlock <T

    >
  • The async routine creates all HTML output and waits for the SendAsync data in bufferBlock
  • The buffer block implements the ISourceBlock <T

    > interface . The class provides this get property:

Code:

class MyProducer
{
    private System.Threading.Tasks.Dataflow.BufferBlock<T> bufferBlock = new BufferBlock<T>();

    public ISourceBlock<T> Output {get {return this.bufferBlock;}

    public async ProcessAsync()
    {
        while (somethingToProduce)
        {
            T producedData = ProduceOutput(...)
            await this.bufferBlock.SendAsync(producedData);
        }
        // no date to send anymore. Mark the output complete:
        this.bufferBlock.Complete()
    }
}

      

  • The second class takes this ISourceBlock. It will wait in this source block until the data arrives and processes it.
  • do it in an async function
  • stop when there is no more data.

Code:

public class MyConsumer
{
    ISourceBlock<T> Source {get; set;}
    public async Task ProcessAsync()
    {
        while (await this.Source.OutputAvailableAsync())
        {   // there is input of type T, read it:
            var input = await this.Source.ReceiveAsync();
            // process input
        }
        // if here, no more input expected. finish.
    }
}

      

Now compose this:

private async Task ProduceOutput()
{
    var producer = new MyProducer();
    var consumer = new MyConsumer() {Source = producer.Output};
    var producerTask = Task.Run( () => producer.ProcessAsync());
    var consumerTask = Task.Run( () => consumer.ProcessAsync());
    // while both tasks are working do other things.
    // wait until both tasks finished:
    await Task.WhenAll(new Task[] {producerTask, consumerTask});
}

      

For simplicity, I have excluded exception handling and cancellation. StackOverFlow described them

Maintain UI with tasks, handle aggregated exception

Cancel Async Task or Task List

+1


source







All Articles