How do I create an infinite DataFlow grid with exception handling?

I am creating a task processor that uses TPL DataFlow. I will be following the producer-producer model where Producer produces some items that need to be handled from time to time and consumers keep waiting for new products to appear. Here is my code:

async Task Main()
{
    var runner = new Runner();
    CancellationTokenSource cts = new CancellationTokenSource();
    Task runnerTask = runner.ExecuteAsync(cts.Token);

    await Task.WhenAll(runnerTask);
}

public class Runner
{
    public async Task ExecuteAsync(CancellationToken cancellationToken) {
        var random = new Random();

        ActionMeshProcessor processor = new ActionMeshProcessor();
        await processor.Init(cancellationToken);

        while (!cancellationToken.IsCancellationRequested)
        {
            await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

            int[] items = GetItems(random.Next(3, 7));

            await processor.ProcessBlockAsync(items);
        }
    }

    private int[] GetItems(int count)
    {
        Random randNum = new Random();

        int[] arr = new int[count];
        for (int i = 0; i < count; i++)
        {
            arr[i] = randNum.Next(10, 20);
        }

        return arr;
    }
}

public class ActionMeshProcessor
{
    private TransformBlock<int, int> Transformer { get; set; }
    private ActionBlock<int> CompletionAnnouncer { get; set; }

    public async Task Init(CancellationToken cancellationToken)
    {
        var options = new ExecutionDataflowBlockOptions
        {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int>(async input => {

            await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

            if (input > 15)
            {
                throw new Exception($"I can't handle this number: {input}");
            }

            return input + 1;
        }, options);

        this.CompletionAnnouncer = new ActionBlock<int>(async input =>
        {
            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        this.Transformer.LinkTo(this.CompletionAnnouncer);

        await Task.FromResult(0); // what do I await here?
    }

    public async Task ProcessBlockAsync(int[] arr)
    {
        foreach (var item in arr)
        {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }       
    }
}

      

I added a health check above to throw the exception to simulate an exception.

Here are my questions:

  • How can I handle exceptions in the above grid without dropping the whole grid?

  • Is there a better way to initialize / start / continue an infinite DataFlow mesh?

  • Where do I wait for completion?

I have looked at this similar question

+3


source to share


1 answer


Exceptions

Nothing is asynchronous in yours init

, it could be a standard synchronous constructor. You can handle exceptions in your grid without removing the grid with a simple try catch in the lamda that you provide to the block. Then you can handle this case by either filtering the result from your grid or ignoring the result in the following blocks. Below is an example of filtering. For the simple case, int

you can use int?

and filter any value that was null

, or of course, you can set any type of magic indicator value if you like. If you are actually looping around a reference type, you can either pop out the null, or mark the data item as dirty, which can be checked by a predicate on your reference.

public class ActionMeshProcessor {
    private TransformBlock<int, int?> Transformer { get; set; }
    private ActionBlock<int?> CompletionAnnouncer { get; set; }

    public ActionMeshProcessor(CancellationToken cancellationToken) {
        var options = new ExecutionDataflowBlockOptions {
            CancellationToken = cancellationToken,
            MaxDegreeOfParallelism = 5,
            BoundedCapacity = 5
        };


        this.Transformer = new TransformBlock<int, int?>(async input => {
            try {
                await Task.Delay(TimeSpan.FromSeconds(1)); //donig something complex here!

                if (input > 15) {
                    throw new Exception($"I can't handle this number: {input}");
                }

                return input + 1;
            } catch (Exception ex) {
                return null;
            }

        }, options);

        this.CompletionAnnouncer = new ActionBlock<int?>(async input =>
        {
            if (input == null) throw new ArgumentNullException("input");

            Console.WriteLine($"Completed: {input}");

            await Task.FromResult(0);
        }, options);

        //Filtering
        this.Transformer.LinkTo(this.CompletionAnnouncer, x => x != null);
        this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
    }

    public async Task ProcessBlockAsync(int[] arr) {
        foreach (var item in arr) {
            await this.Transformer.SendAsync(item); // await if there are no free slots
        }
    }
}

      

Completion



You can expose to Complete()

and Completion

from your processor and use them to await

terminate when your app shuts down, assuming that this is the only time you've turned off the grid. Also, make sure you propagate completion correctly through your links.

    //Filtering
    this.Transformer.LinkTo(this.CompletionAnnouncer, new DataflowLinkOptions() { PropagateCompletion = true }, x => x != null);
    this.Transformer.LinkTo(DataflowBlock.NullTarget<int?>());
}        

public void Complete() {
    Transformer.Complete();
}

public Task Completion {
    get { return CompletionAnnouncer.Completion; }
}

      

Then, based on your example, the most likely place to terminate is outside of the loop that controls your processing:

public async Task ExecuteAsync(CancellationToken cancellationToken) {
    var random = new Random();

    ActionMeshProcessor processor = new ActionMeshProcessor();
    await processor.Init(cancellationToken);

    while (!cancellationToken.IsCancellationRequested) {
        await Task.Delay(TimeSpan.FromSeconds(1)); // wait before enqueuing more

        int[] items = GetItems(random.Next(3, 7));

        await processor.ProcessBlockAsync(items);
    }
    //asuming you don't intend to throw from cancellation
    processor.Complete();
    await processor.Completion();

}

      

+3


source







All Articles