How to use TPL TransformBlock while waiting for multiple inputs?

If I have multiple data sources (say from a database) and then do some CPU work, how can I represent this with TPL DataFlow?

I noticed that TransformBlock uses one input source, but the input comes from multiple sources and I want to do the most of parallelism to achieve this.

What is the best way to use regular TPL or parallel extensions to do the work with IO binding to the database and then concatenate that data into one point for the TransformBlock?

+3


source to share


1 answer


Have a look at JoinBlock , this might be what you need.

You can also create a Custom Data Flow Type that can do what you want to achieve.

For example, if you are waiting for 5 objects to appear before "processing" them and returning one object (I am using expando here for illustration ...), to one receiver (which should wait asynchronously as well)

public sealed class YourCustomBlock : IPropagatorBlock<ExpandoObject, ExpandoObject>
{

    // The target part of the block. coming in
    private readonly ITargetBlock<ExpandoObject> m_target;
    // The source part of the block. going out
    private readonly ISourceBlock<ExpandoObject> m_source;
    // dependency count
    private readonly int _size ;

    // temporary holding area of incoming stuff
    private Queue<ExpandoObject> _queue;

    public YourCustomBlock(int inputs)
    {
      _size = inputs;
      _queue = new Queue<ExpandoObject>(_size);

      var mainWorker= new TransformBlock<ExpandoObject[], ExpandoObject>     (async expandoArray =>
        {
            // Do Your Stuff with expandoArray and then return something
            // ExpandoObject in this example

            await Task.Delay(1000).ConfigureAwait(false); 

            return /*Some ExpandoObject */;
        });

        var head = new ActionBlock<ExpandoObject>(async item =>
        {

            _queue.Enqueue(item);

            if (_queue.Count > _size)
            {
                _queue.Dequeue();  
            }
            // Post when you reach
            // the size
            if (_queue.Count == _size)
            {
                await mainWorker.SendAsync(_queue.ToArray());
                _queue.Clear();
            }
        });

        // expose as a block
        m_source = mainWorker;
        m_target = head;

    }
}

      



Using example:

var myBlock = new YourCustomBlock(5);
Task.Run(async () => {
 for (var i=0;i<5;i++) {
    await myBlock.SendAsync(/*SomeExpandoObject*/).ConfigureAwait(false);
 }
});

var results = await myBlock.ReceiveAsync().ConfigureAwait(false);

      

Note. This has not been verified by compilation and is just an illustration of this idea.

+2


source







All Articles