The best way to parallelize bulk email sending

I'm new to TPL (Task Parallel Library) and I'm having a hard time setting up a process to run tasks in parallel.

I am working on an application for sending bulk emails (e.g. thousands per minute, this idea), but when I see the performance of the processors, it is not good: I am sure there is a lot of overhead there because I am not using the task library correctly .

Here's my code:

public async void MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);

    foreach (var batch in batches.AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount))
    {
         await Task.WhenAll(from emailToProcess in batch 
                    select ProcessSingleEmail(emailToProcess));
        _emailsToProcessRepository.MarkBatchAsProcessed(batch);
    }
}

private async Task ProcessSingleEmail(EmailToProcess emailToProcess)
{
    try
    {
        MailMessage mail = GetMail(emailToProcess); //static light method
        await _smtpClient.SendAsync(sendGridMail);
        emailToProcess.Processed = true;
    }
    catch (Exception e)
    {
        _logger.Error(ErrorHelper.GetExceptionMessage(e, 
                    string.Format("Error sending Email ID #{0} : ", 
                    emailToProcess.Id)), e);
    }
}

      

(I know this can look terrible: please feel free to me ☺)

I need it to behave like this: I need to process multiple records in a batch (by the way, I am using a library that allows me to use the "Batch" method), so I need to mark the batch of records being processed in the database when the process finishes sending them ...

The process actually does what I want: apart from slow as hell . And as you can see in perfmon, the processors are not very high:

enter image description here

What's the best way to do this? Any advice?

EDIT: I understand that I have overhead problems. Is there any tool or easy way to detect and fix them?

+3


source to share


1 answer


What you are doing is not CPU related, but I / O related, so limiting the number of concurrent tasks per number if CPUs are likely to impact your performance. Try running additional tasks in parallel.

For example, the code below will process all emails asynchronously, but limit to 100 emails in parallel. It uses an extension method ForEachAsync

to do the processing, this method allows you to limit the degree of parallelism with a parameter, so I would try experimenting with this parameter being larger.

You can also make the method MarkBatchAsProcessed

asynchronous if possible, as this can also limit performance.

public static class Extensions
{
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
    {
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);
    }

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));
    }
}

public Task MainProcess()
{
    // Process 100 emails at a time
    return emailsToProcess.ForEachAsync(100, async (m) =>
    {
        await ProcessSingleEmail(m);                
    });

    _emailsToProcessRepository.MarkBatchAsProcessed(emailsToProcess);
}

      



You should also avoid using void

async returning methods, they do not propagate exceptions and cannot be thrown or expected, and their use is mostly for event handlers, so I changed MainProcess

to return Task

.

Update

The number 100 in the above code means that no more than 100 concurrent tasks will be running at any given time, so it looks more like a sliding window than a batch. If you want to process emails in batches, you can do something like this (assuming batches have a Count property:

public async Task MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);

    foreach (var batch in batches)
    {
         return batch.ForEachAsync(batch.Count, async (m) =>
         {
             await ProcessSingleEmail(m);                
         });

       _emailsToProcessRepository.MarkBatchAsProcessed(batch);             
    }
}

      

+6


source







All Articles