Nested async lambda not expected
The following code does not return the entire collection that is being iterated over. The returned array is of arbitrary length each time it is run. What's wrong?
public async Task<IHttpActionResult> GetClients()
{
var clientInfoCollection = new ConcurrentBag<ClientInfoModel>();
await _client.Iterate(async (client) =>
{
clientInfoCollection.Add(new ClientInfoModel
{
name = client.name,
userCount = await _user.Count(clientId)
});
});
return Ok(clientInfoCollection.ToArray());
}
The following code uses the new MongoDB C # async driver
public async Task Iterate(Action<TDocument> processor)
{
await _collection.Find<TDocument>(_ => true).ForEachAsync(processor);
}
source to share
The reason you see an arbitrary number of values ββis because it Iterate
gets a type delegate Action<T>
, which is equivalent async void
, which makes this "fire and forget" style of execution.
The inner method doesn't actually know that an async delegate was passed to it, so it iterates through the collection without asynchronously waiting for each item to complete.
Instead, you need to make the method parameter a type delegate Func<TDocument, Task>
and use the proper overloadForEachAsync
:
public Task Iterate(Func<TDocument, Task> processor)
{
return _collection.Find<TDocument>(_ => true).ForEachAsync(processor);
}
You can see the source here :
public static async Task ForEachAsync<TDocument>(
this IAsyncCursor<TDocument> source,
Func<TDocument, int, Task> processor,
CancellationToken cancellationToken = default(CancellationToken))
{
Ensure.IsNotNull(source, "source");
Ensure.IsNotNull(processor, "processor");
// yes, we are taking ownership... assumption being that they've
// exhausted the thing and don't need it anymore.
using (source)
{
var index = 0;
while (await source.MoveNextAsync(cancellationToken).ConfigureAwait(false))
{
foreach (var document in source.Current)
{
await processor(document, index++).ConfigureAwait(false);
cancellationToken.ThrowIfCancellationRequested();
}
}
}
}
source to share
You create threads and install them. From there, you cannot know what is going on. But your next code should return, so you are gambling that the threads will run faster than your main thread.
In normal streaming scenarios, you will join streams that add items to the bag. Where connection are threads waiting for other threads to execute and thus still asynchronous but waiting to return before everything is complete.
This is explained nicely here: http://www.dotnetperls.com/thread-join
source to share