C # TPL calling tasks in parallel and creating new files asynchronously

I am trying to learn TPL. I am writing to files in parallel, for example:

public async Task SaveToFilesAsync(string path, List<string> list, CancellationToken ct)
{
    int count = 0;
    foreach (var str in list)
    {
        string fullPath = path + @"\" + count.ToString() + "_element.txt";
        using (var sw = File.CreateText(fullPath))
        {
            await sw.WriteLineAsync(str);
        }
        count++;

        Log("Saved in thread: {0} to {1}", 
           Environment.CurrentManagedThreadId,
           fullPath);

        if (ct.IsCancellationRequested)
            ct.ThrowIfCancellationRequested();
    }
}

      

And call it like this:

var tasks = new List<Task>();

try
{
    tasks.Add(SaveToFilesAsync(path, myListOfStrings, cts.Token));
}
catch (Exception ex)
{
    Log("Failed to save: " + ex.Message);
    throw;
}

tasks.Add(MySecondFuncAsync(), cts.Token);
//...
tasks.Add(MyLastFuncAsync(), cts.Token);

try
{
    //Or should I call await Task.WhenAll(tasks) ? What should I call here?
    Task.WaitAll(tasks.ToArray()); 
}
catch (AggregateException ex)
{
    foreach (var v in ex.InnerExceptions)
       Error(ex.Message + " " + v.Message);
}
finally
{
   cts.Dispose();
} 

foreach (task in tasks)
{
// Now, how to print results from the tasks? 
//Considering that all tasks return bool value, 
//I need to do something like this:
if (task.Status != TaskStatus.Faulted)
         Console.Writeline(task.Result);
else
         Log("Error...");
}

      

My goal is to have all the ( SaveToFilesAsync

, MySecondFuncAsync

) functions run concurrently in parallel, using all the cores on the computer and saving time. But when I see the logs SaveToFilesAsync

, I realize that saving files always happens on the same thread, not in parallel. What am I doing wrong? Second question: How can I get Task.Result from each task in the task list at the end of the code? If the second function returns Task (bool), how do I get the bool value in my code? Also, all comments about my code are very welcome as I am new to TPL.

+2


source to share


2 answers


You need to replace the foreach loop, which runs sequentially from the first to the last item, with a Parallel.ForEach () loop, which can be configured for parallelism, or Parallel.For (), which gives you the index of the current item being processed. Since you need to use a counter for file names, you will need to change the list parameter to indicate the file number you fill in when you create your list, or use the index provided by Parallel.For (). Another option would be to have a long variable on which you could do Interlocked.Increment after the filename is generated, but I'm not sure if that would be optimal, I haven't tried it.

This is how it will look.

Wrap the code that is called by SaveFilesAsync in a try / trick to handle the operation canceled by the CancellationTokenSource

var cts = new CancellationTokenSource();

try
{
    Task.WaitAll(SaveFilesAsync(@"C:\Some\Path", files, cts.Token));
}
catch (Exception)
{
    Debug.Print("SaveFilesAsync Exception");
}
finally
{
    cts.Dispose();
}

      



Then do your parallelism in this method.

public async Task SaveFilesAsync(string path, List<string> list, CancellationToken token)
{
    int counter = 0;

    var options = new ParallelOptions
                      {
                          CancellationToken = token,
                          MaxDegreeOfParallelism = Environment.ProcessorCount,
                          TaskScheduler = TaskScheduler.Default
                      };

    await Task.Run(
        () =>
            {
                try
                {
                    Parallel.ForEach(
                        list,
                        options,
                        (item, state) =>
                            {
                                // if cancellation is requested, this will throw an OperationCanceledException caught outside the Parallel loop
                                options.CancellationToken.ThrowIfCancellationRequested();

                                // safely increment and get your next file number
                                int index = Interlocked.Increment(ref counter);
                                string fullPath = string.Format(@"{0}\{1}_element.txt", path, index);

                                using (var sw = File.CreateText(fullPath))
                                {
                                    sw.WriteLine(item);
                                }

                                Debug.Print(
                                    "Saved in thread: {0} to {1}",
                                    Thread.CurrentThread.ManagedThreadId,
                                    fullPath);
                            });
                }
                catch (OperationCanceledException)
                {
                    Debug.Print("Operation Canceled");
                }
            });
}

      

The other part of your code doesn't change, just adapt where you create your file content list.

Edit : Trying to / catch when calling SaveFileAsync does nothing, everything is handled inside SaveFileAsync.

+1


source


Try the following:

public async Task SaveToFileAsync(string fullPath, line)
{
    using (var sw = File.CreateText(fullPath))
    {
        await sw.WriteLineAsync(str);
    }

    Log("Saved in thread: {0} to {1}", 
       Environment.CurrentManagedThreadId,
       fullPath);
}

public async Task SaveToFilesAsync(string path, List<string> list)
{
    await Task.WhenAll(
        list
            .Select((line, i) =>
                SaveToFileAsync(
                    string.Format(
                        @"{0}\{1}_element.txt",
                        path,
                        i),
                    line));
}

      



Since you only write one line per file and want to do it all, I don't think it's undone.

0


source







All Articles