Why is the exception not being handled in this asynchronous rewrapper using TPL?

I am on .Net 4 and have the following "Retry" Async wrapper:

public static Task<T> Retry<T, TException>(Func<T> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount = 3) where TException : Exception
{
    for (var i = 0; i < maxExecutionCount; i++)
    {
        try
        {
            return Task.Factory.StartNew(work);
        } 
        catch (AggregateException ae)
        {
            ae.Handle(e =>
            {
                if (e is TException)
                {
                    // allow program to continue in this case
                    // do necessary logging or whatever
                    if (onException != null) { onException((TException)e); }

                    Thread.Sleep(retryInterval);
                    return true;
                }
                throw new RetryWrapperException("Unexpected exception occurred", ae);
            });
        }
    }
    var msg = "Retry unsuccessful after: {0} attempt(s)".FormatWith(maxExecutionCount);
    throw new RetryWrapperException(msg);
}

      

What I am trying to test using:

[TestFixture]
public class RetryWrapperTest
{
    private static int _counter;

    private Func<string> _work;
    private Action<InvalidOperationException> _onException;
    private TimeSpan _retryInterval;
    private int _maxRetryCount;

    [TestFixtureSetUp]
    public void SetUp()
    {
        _counter = 0;

        _work =  GetSampleResult;
        _onException = e => Console.WriteLine("Caught the exception: {0}", e);
        _retryInterval = TimeSpan.FromSeconds(5);
        _maxRetryCount = 4;
    }

    [Test]
    public void Run()
    {
        var resultTask = RetryWrapper.Retry(_work, _onException, _retryInterval, _maxRetryCount);
        Console.WriteLine("This wrapper doesn't block!");

        // Why is this line throwing? I expect the exception to be handled
        // by the wrapper
        var result = resultTask.Result;
        result.Should().NotBeNullOrWhiteSpace();
        result.Should().Be("Sample result!");
    }

    private static string GetSampleResult()
    {
        if (_counter < 3)
        {
            _counter++;
            throw new InvalidOperationException("Baaah!");
        }
        return "Sample result!";
    }
}

      

However, an Aggregate Exception is thrown instead of being caught. The whole point of using this wrapper is not to put try-catch around var result = resultTask.Result;

, what am I doing wrong?

Please note, I cannot use async- await or Bcl.Async as I am on .Net 4, VS 2010

+3


source to share


3 answers


What you want to do is not easy. In fact, it is so hard to get "right" that I wrote a library specifically to address this problem ( Rackspace Thread Library , Apache 2.0).

Desired behavior

The desired behavior for your code is most easily described with async

/ await

, although the final version of the code will not use this feature. I made a few changes to your method Retry

for this.

  • To improve the method's ability to support truly asynchronous operations, I changed the parameter work

    from Func<T>

    to Func<Task<T>>

    .
  • To avoid blocking the thread unnecessarily (even if it's a background thread), I used Task.Delay

    instead Thread.Sleep

    .
  • I made the assumption that onException

    never null

    . While this assumption is not easy to overlook for code using async

    / await

    , it could be resolved for the implementation below if needed.
  • I made the assumption that if the task being returned work

    enters a state Faulted

    , it will have a property InnerExceptions

    that only contains 1 exception. If it contains more than one exception, all but the first will be ignored. While this assumption is not easy to overlook for code using async

    / await

    , it could be resolved for the implementation below if needed.

Here is the resulting implementation. Note that I could have used a loop for

instead of a loop while

, but as you will see, this complicates the next step.

public async Task<T> Retry<T, TException>(Func<Task<T>> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount)
    where TException : Exception
{
    int count = 0;
    while (count < maxExecutionCount)
    {
        if (count > 0)
            await Task.Delay(retryInterval);

        count++;

        try
        {
            return await work();
        }
        catch (TException ex)
        {
            onException(ex);
        }
        catch (Exception ex)
        {
            throw new RetryWrapperException("Unexpected exception occurred", ex);
        }
    }

    string message = string.Format("Retry unsuccessful after: {0} attempt(s)", maxExecutionCount);
    throw new RetryWrapperException(message);
}

      

Porting to .NET 4.0

Converting this code to .NET 4.0 (and even .NET 3.5) uses the following features of the Rackspace threading library:

  • TaskBlocks.While

    : To convert the loop while

    .
  • CoreTaskExtensions.Select

    : to perform synchronous operations after successfully completing an antecedent task.
  • CoreTaskExtensions.Then

    : to perform asynchronous operations after the antecedent task has successfully completed.
  • CoreTaskExtensions.Catch

    (new for V1.1): for exception handling.
  • DelayedTask.Delay

    (new for V1.1): for behavior Task.Delay

    .

There are several behavioral differences between this implementation and above. In particular:



  • If it work

    does null

    , the one returned by this method Task

    will transition into a canceled state (as shown by this test ), where the above method will go into an error state due to a NullReferenceException

    .
  • This implementation behaves as if it was ConfigureAwait(false)

    called before each await

    in the previous implementation. At least in my opinion, it's actually not bad.
  • If a method onException

    throws an exception, this implementation will wrap that exception in RetryWrapperException

    . In other words, this implementation actually models this code, not the block written in the Desired Behavior section :

    try
    {
        try
        {
            return await work();
        }
        catch (TException ex)
        {
            onException(ex);
        }
    }
    catch (Exception ex)
    {
        throw new RetryWrapperException("Unexpected exception occurred", ex);
    }
    
          

Here is the resulting implementation:

public static Task<T> Retry<T, TException>(Func<Task<T>> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount)
    where TException : Exception
{
    int count = 0;
    bool haveResult = false;
    T result = default(T);

    Func<bool> condition = () => count < maxExecutionCount;
    Func<Task> body =
        () =>
        {
            Task t1 = count > 0 ? DelayedTask.Delay(retryInterval) : CompletedTask.Default;

            Task t2 =
                t1.Then(
                    _ =>
                    {
                        count++;
                        return work();
                    })
                .Select(
                    task =>
                    {
                        result = task.Result;
                        haveResult = true;
                    });

            Task t3 =
                t2.Catch<TException>(
                    (_, ex) =>
                    {
                        onException(ex);
                    })
                .Catch<Exception>((_, ex) =>
                    {
                        throw new RetryWrapperException("Unexpected exception occurred", ex);
                    });

            return t3;
        };

    Func<Task, T> selector =
        _ =>
        {
            if (haveResult)
                return result;

            string message = string.Format("Retry unsuccessful after: {0} attempt(s)", maxExecutionCount);
            throw new RetryWrapperException(message);
        };

    return
        TaskBlocks.While(condition, body)
        .Select(selector);
}

      

Test example

The following test method demonstrates the features described above as described.

[TestMethod]
public void Run()
{
    Func<Task<string>> work = GetSampleResultAsync;
    Action<InvalidOperationException> onException = e => Console.WriteLine("Caught the exception: {0}", e);
    TimeSpan retryInterval = TimeSpan.FromSeconds(5);
    int maxRetryCount = 4;

    Task<string> resultTask = Retry(work, onException, retryInterval, maxRetryCount);
    Console.WriteLine("This wrapper doesn't block");

    var result = resultTask.Result;
    Assert.IsFalse(string.IsNullOrWhiteSpace(result));
    Assert.AreEqual("Sample result!", result);
}

private static int _counter;

private static Task<string> GetSampleResultAsync()
{
    if (_counter < 3)
    {
        _counter++;
        throw new InvalidOperationException("Baaah!");
    }

    return CompletedTask.FromResult("Sample result!");
}

      

Future considerations

If you really want to implement rock hardness, I recommend that you further modify your code in the following ways.

  • Cancellation of support.

    • Add a parameter CancellationToken cancellationToken

      as the last parameter of the method Retry

      .
    • Change the type work

      to Func<CancellationToken, Task<T>>

      .
    • Pass the argument cancellationToken

      to work

      and to the call DelayedTask.Delay

      .
  • Support deferral policies. You can remove the options retryInterval

    and maxExecutionCount

    and use instead IEnumerable<TimeSpan>

    , or you can enable and use the interface, for example IBackoffPolicy

    along with the default implementation, for example BackoffPolicy

    (both licensed by MIT).

+4


source


If you want this wrapper to work, you will need to use async / await, or write a catch block using continuation. This is because the exception is not thrown until you try to get the result.

To do this with async / await in .NET 4.0 download Microsoft.Bcl.Async which gives support for async / awend 4.0



public static async Task<T> Retry<T, TException>(Func<T> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount = 3) where TException : Exception
{
    for (var i = 0; i < maxExecutionCount; i++)
    {
        try
        {
            return await Task.Factory.StartNew(work);
        } 
        catch (AggregateException ae)
        {
            ae.Handle(e =>
            {
                if (e is TException)
                {
                    // allow program to continue in this case
                    // do necessary logging or whatever
                    if (onException != null) { onException((TException)e); }

                    Thread.Sleep(retryInterval);
                    return true;
                }
                throw new RetryWrapperException("Unexpected exception occurred", ae);
            });
        }
    }
    var msg = "Retry unsuccessful after: {0} attempt(s)".FormatWith(maxExecutionCount);
    throw new RetryWrapperException(msg);
}

      

I'm not really sure which way to do this without async / await. I know what it will be .ContunueWith(

with TaskContinuationOptions.OnlyOnFaulted

, but I'm not sure about the exact details.

+1


source


There are several errors in the code:

  • As mentioned earlier, you are not await

    at an executable Task

    , there is no chance for it to terminate and catch the exception, since the returned one Task

    immediately fires the caller. If await

    not available, you will have to use the ContinueWith

    continuation style method instead of this implementation.

  • You are using Thread.Sleep

    inside your code. As far as you know, someone might be using this in their main thread without understanding why this thread got stuck all of a sudden, which is incredibly unreliable.

  • If the delegate onException

    is false, you are throwing the exception using throw new

    that will lose you the StackTrace of your method. Is this what you're really after? the best suggestion might be to keep them all and use AggregationException

    at the end of the reps.

This post contains what you need:

public static Task StartNewDelayed(int millisecondsDelay, Action action) 
{ 
    // Validate arguments 
    if (millisecondsDelay < 0) 
        throw new ArgumentOutOfRangeException("millisecondsDelay"); 
    if (action == null) throw new ArgumentNullException("action"); 

    // Create the task 
    var t = new Task(action); 
    // Start a timer that will trigger it 
    var timer = new Timer( 
        _ => t.Start(), null, millisecondsDelay, Timeout.Infinite); 
    t.ContinueWith(_ => timer.Dispose());
    return t; 
}

private static Task<T> Retry<T>(Func<T> func, int retryCount, int delay, TaskCompletionSource<T> tcs = null)
{
    if (tcs == null)
        tcs = new TaskCompletionSource<T>();
    Task.Factory.StartNew(func).ContinueWith(_original =>
    {
        if (_original.IsFaulted)
        {
            if (retryCount == 0)
                tcs.SetException(_original.Exception.InnerExceptions);
            else
                Task.Factory.StartNewDelayed(delay).ContinueWith(t =>
                {
                    Retry(func, retryCount - 1, delay,tcs);
                });
        }
        else
            tcs.SetResult(_original.Result);
    });
    return tcs.Task;
} 

      

0


source







All Articles