Recursive async calls with TPL / async await

I'm looking for recursive processing of a hierarchical structure using C # asynchronous functions (TPL / async / await). Here is an overview of what I am trying to do

I have a collection of jobs to process as shown below. Each assignment has something to do, and it could possibly have one or more children who also have something to do. All parent and child jobs call the same function to do the actual "work" and that function in "asynchronous" (code below)

/*
 *  Jobs Collection
 *  |
 *  |__ Job1
 *  |    |__ Job4
 *  |    |     |__ Job7
 *  |    |
 *  |    |__ Job5
 *  |
 *  |__ Job2
 *  |    |__ Job6
 *  |
 *  |__ Job3
 *  |
 */

      

  • There are 3 levels in the hierarchy.

  • I would like to start processing the first level (Job1, Job2, Job3) in parallel.

  • Once they start in parallel, each individual task will start processing on its own, wait for it to finish processing (important), and then continue processing our children recursively until the hierarchy ends. Children depend on the data processed by the parent, and therefore they wait for the parent to complete.

  • The actual "job" (called by the parent and children) is processed asynchronously because the caller is asynchronous, so no "new thread" is required (Task.StartNew ()).

Here is some sample code that I am using to demonstrate the scenario -

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    // first level 
    Parallel.ForEach(jobs,
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
                (job) => ExecuteJob(job));
}

private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    Task t = GetDataAsync(job);
    t.Wait(); // needed such that parent response is received before children start over (?).


    if (job.Children != null)
    {
        job.Children.ToList().ForEach((r) =>
        {
            r.ParentResponse = job.Response; // Children need parent response
            ExecuteJob(r);
        });
    }
}

private async Task GetDataAsync(WebJob j)
{
    // This is just test code. Ideally it would be an external call to some "async" method
    await Task.Delay(1000);
    j.Response = string.Format("{0} complete", j.Name);
    Console.ForegroundColor = ConsoleColor.Cyan;
    Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "\t", j.Name, "\t", Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("--------------");
}

private WebJob[] CreateWebJobs()
{
    return new WebJob[] {
        new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1", 
            Children = new WebJob[] 
            {
                new WebJob() 
                { 
                    Id=2, Name = "Job2", ExecURL = "http://url2", 
                    Children = new WebJob[] 
                    {
                        new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
                    }
                },
                new WebJob() 
                { 
                    Id=3, Name = "Job3", ExecURL = "http://url3" 
                }
            }
        },
        new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}                
    };
}

      

  • The process method starts by running all "first level" jobs in parallel to each other.
  • The ExecuteJob method accepts and recursively jumps to the children to complete all processing.

This works fine, but I'm not sure if this recursive asynchronous pattern is an efficient approach. I thought to avoid t.Wait (). I tried ContinueWith on t which is no different in my understanding, I also read about the ForEachAsync pattern and wondered if it would match. This solution will eventually become an ASP.NET Web API service. Any thoughts on this recursive asynchronous pattern?

+3


source to share


2 answers


If GetDataAsync

is the only blocking operation you have, you can use asynchronous programming throughout, avoiding the need for calls Parallel.ForEach

or blocking calls Wait

.

public async Task Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    await Task.WhenAll(jobs.Select(ExecuteJob));
}

private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    await GetDataAsync(job);

    if (job.Children != null)
    {
        var childTasks = job.Children.Select(r =>
        {
            r.ParentResponse = job.Response;
            return ExecuteJob(r);
        });

        await Task.WhenAll(childTasks);
    }
}

      



Edit . If the top-level method needs to block (rather than jeopardize consumers and forget), do the following:

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    Task.WaitAll(jobs.Select(ExecuteJob));
}

      

+4


source


Since your kernel is asynchronous, you shouldn't use concurrency or multithreading at all. You want concurrency without parallelism - that is, asynchronous concurrency, usually with Task.WhenAll

.

This is doubly true as you plan to deploy in ASP.NET where parallelism can dramatically reduce your scalability.



public async Task ProcessAsync()
{
  WebJob[] jobs = CreateWebJobs();

  await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}

private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
  Console.ForegroundColor = ConsoleColor.DarkYellow;
  Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

  await GetDataAsync(job);
  if (job.Children != null)
  {
    var childTasks = job.Children.Select(async x =>
    {
      x.ParentResponse = job.Response; // Children need parent response
      await ExecuteJobAsync(x);
    });
    await Task.WhenAll(childTasks);
  }
}

      

+2


source







All Articles