Why is the exception not being handled in this asynchronous rewrapper using TPL?
I am on .Net 4 and have the following "Retry" Async wrapper:
public static Task<T> Retry<T, TException>(Func<T> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount = 3) where TException : Exception
{
for (var i = 0; i < maxExecutionCount; i++)
{
try
{
return Task.Factory.StartNew(work);
}
catch (AggregateException ae)
{
ae.Handle(e =>
{
if (e is TException)
{
// allow program to continue in this case
// do necessary logging or whatever
if (onException != null) { onException((TException)e); }
Thread.Sleep(retryInterval);
return true;
}
throw new RetryWrapperException("Unexpected exception occurred", ae);
});
}
}
var msg = "Retry unsuccessful after: {0} attempt(s)".FormatWith(maxExecutionCount);
throw new RetryWrapperException(msg);
}
What I am trying to test using:
[TestFixture]
public class RetryWrapperTest
{
private static int _counter;
private Func<string> _work;
private Action<InvalidOperationException> _onException;
private TimeSpan _retryInterval;
private int _maxRetryCount;
[TestFixtureSetUp]
public void SetUp()
{
_counter = 0;
_work = GetSampleResult;
_onException = e => Console.WriteLine("Caught the exception: {0}", e);
_retryInterval = TimeSpan.FromSeconds(5);
_maxRetryCount = 4;
}
[Test]
public void Run()
{
var resultTask = RetryWrapper.Retry(_work, _onException, _retryInterval, _maxRetryCount);
Console.WriteLine("This wrapper doesn't block!");
// Why is this line throwing? I expect the exception to be handled
// by the wrapper
var result = resultTask.Result;
result.Should().NotBeNullOrWhiteSpace();
result.Should().Be("Sample result!");
}
private static string GetSampleResult()
{
if (_counter < 3)
{
_counter++;
throw new InvalidOperationException("Baaah!");
}
return "Sample result!";
}
}
However, an Aggregate Exception is thrown instead of being caught. The whole point of using this wrapper is not to put try-catch around var result = resultTask.Result;
, what am I doing wrong?
Please note, I cannot use async- await or Bcl.Async as I am on .Net 4, VS 2010
source to share
What you want to do is not easy. In fact, it is so hard to get "right" that I wrote a library specifically to address this problem ( Rackspace Thread Library , Apache 2.0).
Desired behavior
The desired behavior for your code is most easily described with async
/ await
, although the final version of the code will not use this feature. I made a few changes to your method Retry
for this.
- To improve the method's ability to support truly asynchronous operations, I changed the parameter
work
fromFunc<T>
toFunc<Task<T>>
. - To avoid blocking the thread unnecessarily (even if it's a background thread), I used
Task.Delay
insteadThread.Sleep
. - I made the assumption that
onException
nevernull
. While this assumption is not easy to overlook for code usingasync
/await
, it could be resolved for the implementation below if needed. - I made the assumption that if the task being returned
work
enters a stateFaulted
, it will have a propertyInnerExceptions
that only contains 1 exception. If it contains more than one exception, all but the first will be ignored. While this assumption is not easy to overlook for code usingasync
/await
, it could be resolved for the implementation below if needed.
Here is the resulting implementation. Note that I could have used a loop for
instead of a loop while
, but as you will see, this complicates the next step.
public async Task<T> Retry<T, TException>(Func<Task<T>> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount)
where TException : Exception
{
int count = 0;
while (count < maxExecutionCount)
{
if (count > 0)
await Task.Delay(retryInterval);
count++;
try
{
return await work();
}
catch (TException ex)
{
onException(ex);
}
catch (Exception ex)
{
throw new RetryWrapperException("Unexpected exception occurred", ex);
}
}
string message = string.Format("Retry unsuccessful after: {0} attempt(s)", maxExecutionCount);
throw new RetryWrapperException(message);
}
Porting to .NET 4.0
Converting this code to .NET 4.0 (and even .NET 3.5) uses the following features of the Rackspace threading library:
-
TaskBlocks.While
: To convert the loopwhile
. -
CoreTaskExtensions.Select
: to perform synchronous operations after successfully completing an antecedent task. -
CoreTaskExtensions.Then
: to perform asynchronous operations after the antecedent task has successfully completed. -
CoreTaskExtensions.Catch
(new for V1.1): for exception handling. -
DelayedTask.Delay
(new for V1.1): for behaviorTask.Delay
.
There are several behavioral differences between this implementation and above. In particular:
- If it
work
doesnull
, the one returned by this methodTask
will transition into a canceled state (as shown by this test ), where the above method will go into an error state due to aNullReferenceException
. - This implementation behaves as if it was
ConfigureAwait(false)
called before eachawait
in the previous implementation. At least in my opinion, it's actually not bad. -
If a method
onException
throws an exception, this implementation will wrap that exception inRetryWrapperException
. In other words, this implementation actually models this code, not the block written in the Desired Behavior section :try { try { return await work(); } catch (TException ex) { onException(ex); } } catch (Exception ex) { throw new RetryWrapperException("Unexpected exception occurred", ex); }
Here is the resulting implementation:
public static Task<T> Retry<T, TException>(Func<Task<T>> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount)
where TException : Exception
{
int count = 0;
bool haveResult = false;
T result = default(T);
Func<bool> condition = () => count < maxExecutionCount;
Func<Task> body =
() =>
{
Task t1 = count > 0 ? DelayedTask.Delay(retryInterval) : CompletedTask.Default;
Task t2 =
t1.Then(
_ =>
{
count++;
return work();
})
.Select(
task =>
{
result = task.Result;
haveResult = true;
});
Task t3 =
t2.Catch<TException>(
(_, ex) =>
{
onException(ex);
})
.Catch<Exception>((_, ex) =>
{
throw new RetryWrapperException("Unexpected exception occurred", ex);
});
return t3;
};
Func<Task, T> selector =
_ =>
{
if (haveResult)
return result;
string message = string.Format("Retry unsuccessful after: {0} attempt(s)", maxExecutionCount);
throw new RetryWrapperException(message);
};
return
TaskBlocks.While(condition, body)
.Select(selector);
}
Test example
The following test method demonstrates the features described above as described.
[TestMethod]
public void Run()
{
Func<Task<string>> work = GetSampleResultAsync;
Action<InvalidOperationException> onException = e => Console.WriteLine("Caught the exception: {0}", e);
TimeSpan retryInterval = TimeSpan.FromSeconds(5);
int maxRetryCount = 4;
Task<string> resultTask = Retry(work, onException, retryInterval, maxRetryCount);
Console.WriteLine("This wrapper doesn't block");
var result = resultTask.Result;
Assert.IsFalse(string.IsNullOrWhiteSpace(result));
Assert.AreEqual("Sample result!", result);
}
private static int _counter;
private static Task<string> GetSampleResultAsync()
{
if (_counter < 3)
{
_counter++;
throw new InvalidOperationException("Baaah!");
}
return CompletedTask.FromResult("Sample result!");
}
Future considerations
If you really want to implement rock hardness, I recommend that you further modify your code in the following ways.
-
Cancellation of support.
- Add a parameter
CancellationToken cancellationToken
as the last parameter of the methodRetry
. - Change the type
work
toFunc<CancellationToken, Task<T>>
. - Pass the argument
cancellationToken
towork
and to the callDelayedTask.Delay
.
- Add a parameter
-
Support deferral policies. You can remove the options
retryInterval
andmaxExecutionCount
and use insteadIEnumerable<TimeSpan>
, or you can enable and use the interface, for exampleIBackoffPolicy
along with the default implementation, for exampleBackoffPolicy
(both licensed by MIT).
source to share
If you want this wrapper to work, you will need to use async / await, or write a catch block using continuation. This is because the exception is not thrown until you try to get the result.
To do this with async / await in .NET 4.0 download Microsoft.Bcl.Async which gives support for async / awend 4.0
public static async Task<T> Retry<T, TException>(Func<T> work, Action<TException> onException, TimeSpan retryInterval, int maxExecutionCount = 3) where TException : Exception
{
for (var i = 0; i < maxExecutionCount; i++)
{
try
{
return await Task.Factory.StartNew(work);
}
catch (AggregateException ae)
{
ae.Handle(e =>
{
if (e is TException)
{
// allow program to continue in this case
// do necessary logging or whatever
if (onException != null) { onException((TException)e); }
Thread.Sleep(retryInterval);
return true;
}
throw new RetryWrapperException("Unexpected exception occurred", ae);
});
}
}
var msg = "Retry unsuccessful after: {0} attempt(s)".FormatWith(maxExecutionCount);
throw new RetryWrapperException(msg);
}
I'm not really sure which way to do this without async / await. I know what it will be .ContunueWith(
with TaskContinuationOptions.OnlyOnFaulted
, but I'm not sure about the exact details.
source to share
There are several errors in the code:
-
As mentioned earlier, you are not
await
at an executableTask
, there is no chance for it to terminate and catch the exception, since the returned oneTask
immediately fires the caller. Ifawait
not available, you will have to use theContinueWith
continuation style method instead of this implementation. -
You are using
Thread.Sleep
inside your code. As far as you know, someone might be using this in their main thread without understanding why this thread got stuck all of a sudden, which is incredibly unreliable. -
If the delegate
onException
is false, you are throwing the exception usingthrow new
that will lose you the StackTrace of your method. Is this what you're really after? the best suggestion might be to keep them all and useAggregationException
at the end of the reps.
This post contains what you need:
public static Task StartNewDelayed(int millisecondsDelay, Action action)
{
// Validate arguments
if (millisecondsDelay < 0)
throw new ArgumentOutOfRangeException("millisecondsDelay");
if (action == null) throw new ArgumentNullException("action");
// Create the task
var t = new Task(action);
// Start a timer that will trigger it
var timer = new Timer(
_ => t.Start(), null, millisecondsDelay, Timeout.Infinite);
t.ContinueWith(_ => timer.Dispose());
return t;
}
private static Task<T> Retry<T>(Func<T> func, int retryCount, int delay, TaskCompletionSource<T> tcs = null)
{
if (tcs == null)
tcs = new TaskCompletionSource<T>();
Task.Factory.StartNew(func).ContinueWith(_original =>
{
if (_original.IsFaulted)
{
if (retryCount == 0)
tcs.SetException(_original.Exception.InnerExceptions);
else
Task.Factory.StartNewDelayed(delay).ContinueWith(t =>
{
Retry(func, retryCount - 1, delay,tcs);
});
}
else
tcs.SetResult(_original.Result);
});
return tcs.Task;
}
source to share