How to parallelize writing files using TPL?

I am trying to store a list of lines for multiple files, each line in a different file, and do it at the same time. I do it like this:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    foreach (var str in list)
    {
        string fullPath = path + @"\" + count.ToString() + "_element.txt";
        using (var sw = File.CreateText(fullPath))
        {
            await sw.WriteLineAsync(str);
        }
        count++;

        NLog.Trace("Saved in thread: {0} to {1}", 
           Environment.CurrentManagedThreadId,
           fullPath);

        if (ct.IsCancellationRequested)
            ct.ThrowIfCancellationRequested();
    }
}

      

And call it like this:

try
{
   var savingToFilesTask = SaveToFilesAsync(@"D:\Test", myListOfString, ct);
}
catch(OperationCanceledException)
{
   NLog.Info("Operation has been cancelled by user.");
}

      

But in the log file I can clearly see that the save always happens in the same thread id, so there is no parallelism? What am I doing wrong? How to fix it? My goal is to make all the savings as fast as possible using all the computer cores.

+3


source to share


4 answers


Essentially your problem foreach

is synchronous. It uses IEnumerable

that is synchronous.

To work around this, first encapsulate the body of the loop into an asynchronous function.

public async Task WriteToFile(
        string path,
        string str,
        int count)
{
    var fullPath = string.Format("{0}\\{1}_element.txt", path, count);
    using (var sw = File.CreateText(fullPath))
    {
        await sw.WriteLineAsync(str);
    }

    NLog.Trace("Saved in TaskID: {0} to \"{1}\"", 
       Task.CurrentId,
       fullPath);
}

      

Then, instead of looping synchronously, project the sequence of lines onto the sequence of tasks that execute your encapsulated loop. This is not an asynchronous operation per se, but the projection will not block, i.e. No await

.

Then wait until all tasks are completed in the order specified by the Task Scheduler.



public async Task SaveToFilesAsync(
        string path,
        IEnumerable<string> list,
        CancellationToken ct)
{
    await Task.WhenAll(list.Select((str, count) => WriteToFile(path, str, count));
}

      

There is nothing to undo, so there is no point skipping the cancellation token.

I used indexing overload Select

to provide a value count

.

I changed your logging code to use the current task id, this avoids scheduling confusion.

+4


source


If you want to make this parallel, you must tell .NET to do it. I think one of the easiest ways to do this becomes clear if you split your code into an additional function.

The idea is to split the actual single I / O operation into an additional asynchronous call function and call that function without waiting for them, but keeping their created tasks in the list and waiting for them all at the end.

I don't write C # code at all, so please excuse any syntax error I might make:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    var writeOperations = new List<Task>(list.Count);
    foreach (var str in list)
    { 
        string fullPath = path + @"\" + count.ToString() + "_element.txt";
        writeOperations.add(SaveToFileAsync(fullPath, str, ct));
        count++;
        ct.ThrowIfCancellationRequested();
    }

    await Task.WhenAll(writeOperations);
}

private async Task SaveToFileAsync(string path, string line, CancellationToken ct)
{
    using (var sw = File.CreateText(path))
    {
        await sw.WriteLineAsync(line);
    }

    NLog.Trace("Saved in thread: {0} to {1}", 
        Environment.CurrentManagedThreadId, 
        fullPath);

    ct.ThrowIfCancellationRequested();
}

      



Thus, I / O operations are started one after the other by the same thread. This should work really fast. And the continuation starts after doing I / O operation using .NET ThreadPool.

I also removed the validation if (ct.IsCancellationRequested)

because it doesn't care ct.ThrowIfCancellationRequested();

.

Hope you get an idea of โ€‹โ€‹how to get around these things.

+2


source


If you are using parallel storage (SSD), you can do it faster by parallelizing. Since there is no built-in way to parallelize asynchronous loops with a certain degree of parallelism, I would recommend PLINQ with a fixed degree of parallelism and synchronous IO. Parallel.ForEach

it is not possible to set a fixed DOP (only max. DOP).

+1


source


I added my answer to the original question, should I add it here instead? C # TPL calling tasks in parallel and creating new files asynchronously

Edit . Here, the proposed solution now runs multiple saved ones in parallel.

You need to replace the foreach loop that runs sequentially from the first to the last item with a Parallel.ForEach () loop that can be configured for parallelism.

var cts = new CancellationTokenSource();
Task.WaitAll(SaveFilesAsync(@"C:\Some\Path", files, cts.Token));
cts.Dispose();

      

Then do your parallelism in this method.

public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
    int counter = 0;

    var options = new ParallelOptions
                      {
                          CancellationToken = token,
                          MaxDegreeOfParallelism = Environment.ProcessorCount,
                          TaskScheduler = TaskScheduler.Default
                      };

    await Task.Run(
        () =>
            {
                try
                {
                    Parallel.ForEach(
                        list,
                        options,
                        (item, state) =>
                            {
                                // if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
                                options.CancellationToken.ThrowIfCancellationRequested();

                                // safely increment and get your next file number
                                int index = Interlocked.Increment(ref counter);
                                string fullPath = string.Format(@"{0}\{1}_element.txt", path, index);

                                using (var sw = File.CreateText(fullPath))
                                {
                                    sw.WriteLine(item);
                                }

                                Debug.Print(
                                    "Saved in thread: {0} to {1}",
                                    Thread.CurrentThread.ManagedThreadId,
                                    fullPath);
                            });
                }
                catch (OperationCanceledException)
                {
                    Debug.Print("Operation Canceled");
                }
            });
}

      

+1


source







All Articles