Nested async lambda not expected

The following code does not return the entire collection that is being iterated over. The returned array is of arbitrary length each time it is run. What's wrong?

public async Task<IHttpActionResult> GetClients()
{
    var clientInfoCollection = new ConcurrentBag<ClientInfoModel>();

    await _client.Iterate(async (client) =>
    {
        clientInfoCollection.Add(new ClientInfoModel
        {
            name = client.name,
            userCount = await _user.Count(clientId)
        });
    });

    return Ok(clientInfoCollection.ToArray());
}

      

The following code uses the new MongoDB C # async driver

public async Task Iterate(Action<TDocument> processor)
{
    await _collection.Find<TDocument>(_ => true).ForEachAsync(processor);
}

      

+3


source to share


2 answers


The reason you see an arbitrary number of values ​​is because it Iterate

gets a type delegate Action<T>

, which is equivalent async void

, which makes this "fire and forget" style of execution.

The inner method doesn't actually know that an async delegate was passed to it, so it iterates through the collection without asynchronously waiting for each item to complete.

Instead, you need to make the method parameter a type delegate Func<TDocument, Task>

and use the proper overloadForEachAsync

:



public Task Iterate(Func<TDocument, Task> processor)
{
    return _collection.Find<TDocument>(_ => true).ForEachAsync(processor);
}

      

You can see the source here :

public static async Task ForEachAsync<TDocument>(
                    this IAsyncCursor<TDocument> source, 
                    Func<TDocument, int, Task> processor,
                    CancellationToken cancellationToken = default(CancellationToken))
{
    Ensure.IsNotNull(source, "source");
    Ensure.IsNotNull(processor, "processor");

    // yes, we are taking ownership... assumption being that they've
    // exhausted the thing and don't need it anymore.
    using (source)
    {
        var index = 0;
        while (await source.MoveNextAsync(cancellationToken).ConfigureAwait(false))
        {
            foreach (var document in source.Current)
            {
                await processor(document, index++).ConfigureAwait(false);
                cancellationToken.ThrowIfCancellationRequested();
            }
        }
    }
}

      

+4


source


You create threads and install them. From there, you cannot know what is going on. But your next code should return, so you are gambling that the threads will run faster than your main thread.

In normal streaming scenarios, you will join streams that add items to the bag. Where connection are threads waiting for other threads to execute and thus still asynchronous but waiting to return before everything is complete.



This is explained nicely here: http://www.dotnetperls.com/thread-join

0


source







All Articles