TPL data flow blocks using LinkTo Predicate

I have a few blocks that end up going from a TransformBlock to one of three other transform blocks based on the LinkTo predicate. I am using DataflowLinkOptions to propagate completion. The problem is that when the predicate is executed and this block starts, the rest of my pipeline continues. It would seem that the pipeline must wait for this block to complete.

The code for this is something like this:

var linkOptions = new DataflowLinkOptions {PropagateCompletion = true};
mainBlock.LinkTo(block1, linkOptions, x => x.Status = Status.Complete);
mainBlock.LinkTo(block2, linkOptions, x => x.Status = Status.Cancelled);
mainBlock.LinkTo(block3, linkOptions, x => x.Status = Status.Delayed);
mainBlock.LinkTo(DataflowBlock.NullTarget<Thing>(), linkOptions);

      

Now this doesn't work as I expected as I said, so the only way I have found to get the behavior I want is to output linkOptions and add the following to the lambda for mainBlock.

mainBlock = new TransformBlock<Thing,Thing>(input =>
{
    DoMyStuff(input);

    if (input.Status = Status.Complete)
    {
        mainBlock.Completion.ContinueWith(t => block1.Complete());
    }
    if (input.Status = Status.Cancelled)
    {
        mainBlock.Completion.ContinueWith(t => block2.Complete());
    }
    if (input.Status = Status.Delayed)
    {
        mainBlock.Completion.ContinueWith(t => block3.Complete());
    }

    return input;
});

      

So the question is, is this the only way to get this to work?

BTW, this was done in my unit test with a single piece of data passing through it to try and debug the pipeline behavior. Each block was tested individually with several unit tests. So what happens in my unit test pipeline is that the assertion is hit before the block has finished executing and therefore fails.

If I remove the links block2 and block3 and debug the test using linkOptions it works fine.

+3


source to share


2 answers


Ok. So I must first thank Corey. When I first read his comment, I got a little angry because I felt like my code illustrates the concept very well and easily turns into a working version. But anyway, I felt the need to make a complete testable version that I could post because of his comments.

In my test, the amazing part was even though it mimicked my real code, the path I thought had failed, and the path I even went through failed. This made my head spin a little. So I started doing a few rearrangements of the source code. Basically I created blocks that were synchronous and blocks that were asynchronous and did both types of pipelines. There are four in total, 2 synchronous and 2 asynchronous, one from each used link option to propagate and the other from completion tasks in the MainBlock as shown.

After adding some jobs to the async tasks, I found that the synchronous versions passed the test and the asynchronous versions failed.

So the possible solution to the problem was none of the above. As it turns out, propagating link options (at least that's my guess) will propagate completion for blocks that don't satisfy the predicate in linkTo. So when Thing with Status of Complete comes down, it goes to Block1.

Oh, I must point out in the full test code that I made all blocks 1,2 and 3 connected to the same EndBlock, which is not shown in the source code.

In any case, after the predicate is executed and Thing goes to Block1, blocks 2 and 3, it seems to me, will be completed. This causes the EndBlock we are expecting in the unit test to complete, and Assert fails because Block1 hasn't done its job yet.



If my guess is correct, I feel like it is a bug or design flaw in propagating link completion. Why does a block have to be complete if it has never been used?

So here's what I did to solve the problem. I took out the link parameters and manually connected the completion events. Like this:

MainBlock.Completion.ContinueWith(t =>
{
Block1.Complete();
Block2.Complete();
Block3.Complete();
});

Task.WhenAll(Block1.Completion, Block2.Completion, Block3.Completion)
.ContinueWith(t =>
{
    EndBlock.Complete();
});

      

It worked well, and when switching to my real code worked too. Task.WhenAll is what led me to believe unused blocks were set and why auto propagation was a problem.

Hope this helps someone. I will come back and add the link when I submit all my test codes.

Edit: Here is a link to the test gist code https://gist.github.com/jmichas/bfab9cec84f0d1e40e12

+1


source


Your problem is not related to the code in your question that is working correctly: when the main block finishes, all three subsequent blocks are also marked for completion.

The problem with the ending block: you are using there as well PropagateCompletion

, which means that when any of the three previous blocks ends, the ending block is marked for completion. You want to mark it for completion when all three blocks are complete, and the combination Task.WhenAll().ContinueWith()

from your answer does so (although the first part of this snippet is unnecessary, which does the same thing as PropagateCompletion

).

As it turns out, propagating link options (at least that's my guess) will propagate completion for blocks that don't satisfy the predicate in linkTo.

Yes, it always extends to completion. Completion has no element associated with it, so it doesn't make sense to apply a predicate to it. Maybe the fact that you always only have one item (which is not common) makes this more confusing for you?

If my guess is correct, I feel like it is a bug or design flaw in propagating link completion. Why does a block have to be complete if it has never been used?

Why not? It makes sense to me: even if there were no c elements this time Status.Delayed

, you still want to complete the block processing those elements so that any subsequent code can know that all the delayed elements have already been processed, AND the fact that there were none. irrelevant.




In any case, if you encounter this frequently, you may need to create a helper method that simultaneously binds multiple source blocks to a single target block and broadcasts completion correctly:

public static void LinkTo<T>(
    this IReadOnlyCollection<ISourceBlock<T>> sources, ITargetBlock<T> target,
    bool propagateCompletion)
{
    foreach (var source in sources)
    {
        source.LinkTo(target);
    }

    if (propagateCompletion)
        Task.WhenAll(sources.Select(source => source.Completion))
            .ContinueWith(_ => target.Complete());
}

      

Using:

new[] { block1, block2, block3 }.LinkTo(endBlock, propagateCompletion: true);

      

+4


source







All Articles