How to use DataflowBlockOptions.CancellationToken?

How can i use DataflowBlockOptions.CancellationToken

?

If I create an instance BufferBlock

like this:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

then using the consumer / producer methods it uses queue

, how can I use its CancellationToken to cancel the cancellation?

eg. in the producer method, how can I check the cancellation token - I didn't find any property to access the token.

EDIT: Example of production / consumption methods:

private static async Task Produce(BufferBlock<int> queue, IEnumerable<int> values)
{
    foreach (var value in values)
    {
        await queue.SendAsync(value);
    }

    queue.Complete();
}

private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
{
    var ret = new List<int>();
    while (await queue.OutputAvailableAsync())
    {
        ret.Add(await queue.ReceiveAsync());
    }

    return ret;
}

      

Code to call:

var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = _cts.Token });

// Start the producer and consumer.
var values = Enumerable.Range(0, 10);
Produce(queue, values);
var consumer = Consume(queue);

// Wait for everything to complete.
await Task.WhenAll(consumer, queue.Completion);

      

EDIT2:

If I call _cts.Cancel()

, the method Produce

does not cancel and ends without interruption.

+3


source to share


1 answer


If you want to cancel the processing process, you must pass a token into it, for example:



    private static async Task Produce(
        BufferBlock<int> queue, 
        IEnumerable<int> values,
        CancellationToken token
        )
    {
        foreach (var value in values)
        {
            await queue.SendAsync(value, token);
            Console.WriteLine(value);
        }

        queue.Complete();
    }

    private static async Task<IEnumerable<int>> Consume(BufferBlock<int> queue)
    {
        var ret = new List<int>();
        while (await queue.OutputAvailableAsync())
        {
            ret.Add(await queue.ReceiveAsync());
        }

        return ret;
    }

    static void Main(string[] args)
    {
        var cts = new CancellationTokenSource();

        var queue = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = 5, CancellationToken = cts.Token });

        // Start the producer and consumer.
        var values = Enumerable.Range(0, 100);
        Produce(queue, values, cts.Token);
        var consumer = Consume(queue);

        cts.Cancel();

        try
        {
            Task.WaitAll(consumer, queue.Completion);
        }
        catch (Exception e)
        {
            Console.WriteLine(e.ToString());
        }

        foreach (var i in consumer.Result)
        {
            Console.WriteLine(i);
        }

        Console.ReadKey();

      

+1


source







All Articles