Waiting Job returns WaitingForActivation?

I have a consumer / producer setup that uses BlockingCollection.

public void StartConsumer(CancellationToken cancellationToken)
{
    Task.Factory.StartNew(async () =>
     {               

         foreach (var value in collection.GetConsumingEnumerable(cancellationToken))
         {
             var rowsAffected = await  GetFooAsync(value.Id);                     
         }                

     }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current)
     .ContinueWith(task => HandleConsumerStopped(task, cancellationToken), cancellationToken);
}

      

The task returned from the first thread to "ContinueWith" is always the result of the task. and this task has the result "WaitingForActivation"

So my question is, why does it exit the foreach loop? if it is not blocked until canceled cancellation?

is the iner GetFooAsync causing incorrect behavior from Foreach ??

+3


source to share


1 answer


Your current code will call HandleConsumerStopped

as soon as it hits the first one await

, because that's when your async method returns to the caller. Note that your StartNew returns Task<Task>

, you need to attach a continuation for the inner task. Your code is currently attaching it to an external task.

If you want to complete the entire async method, you can simply call the method HandleConsumerStopped

inside the async method itself.

Task.Factory.StartNew(async () =>
 { 
     foreach (var value in collection.GetConsumingEnumerable(cancellationToken))
     {
         var rowsAffected = await  GetFooAsync(value.Id);                     
     }

     HandleConsumerStopped();//Parameters removed
 }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current);

      



If your use is restricted ContinueWith

, you need to use Task.Run

or you need to call Unwrap

. If you use Task.Run

, you get Unwrap

free, but not TaskCreationOptions

.

 Task.Factory.StartNew(async () =>
 {          
     foreach (var value in collection.GetConsumingEnumerable(cancellationToken))
     {
         var rowsAffected = await GetFooAsync(value.Id);                     
     }                

 }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Current)
 .UnWrap()//Note the unwrap here
 .ContinueWith(task => HandleConsumerStopped(task, cancellationToken), cancellationToken);

      

HandleConsumerStopped

Will now be launched as soon as the whole body StartNew

is complete, not in the middle foreach

.

+6


source







All Articles