TPL Data Flow Transform block post to batch block followed by actionblock

I have a TPL data flow based application that works fine using only a batch block and then an action block.

I added to TransformBlock to try and transform data from source before sending to batch block, but my action block never gets through. No errors and exceptions are thrown.

I'm not sure if I need to fill the transform block as it only hits once.

Is there some step I need to add to my conversion code other than the return object of the output type?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        public const int BATCH_SIZE = 10;
        static void Main(string[] args)
        {
            Console.WriteLine("Application started");

            //Create the pipeline of actions
            var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });
            var batchBlock = new BatchBlock<string>(BATCH_SIZE);
            var uploadFilesToAzureBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 });

            Console.WriteLine("Blocks created");

            //link the actions
            transformBlock.LinkTo(batchBlock);
            batchBlock.LinkTo(uploadFilesToAzureBlock);
            batchBlock.Completion.ContinueWith(obj => uploadFilesToAzureBlock.Complete());

            Console.WriteLine("Blocks linked");

            var testInputs = new List<string>
            {
                "Kyle",
                "Stephen",
                "Jon",
                "Conor",
                "Adrian",
                "Marty",
                "Richard",
                "Norbert",
                "Kerri",
                "Mark",
                "Declan",
                "Ray",
                "Paul",
                "Andrew",
                "Rachel",
                "David",
                "Darrell"
            };

            Console.WriteLine("Data created");

            var i = 0;
            foreach (var name in testInputs)
            {
                Console.WriteLine("Posting name {0}", i);
                transformBlock.Post(name);
                i++;
            }

            batchBlock.Complete();
            uploadFilesToAzureBlock.Completion.Wait();

            Console.WriteLine("Finishing");
            Console.ReadKey();
        }

        private static void OutputStrings(IEnumerable<string> strings)
        {
            Console.WriteLine("Beginning Batch...");

            foreach (var s in strings)
            {
                Console.WriteLine(s);
            }

            Console.WriteLine("Completing Batch...");
        }

        private static string TransformString(string input)
        {
            return input += " has been processed";
        }
    }
}

      

+3


source to share


1 answer


As mentioned in "usr" above, I did not propagate block completion. The following code works fine.



using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace DataflowTest
{
    class Program
    {
        public const int BATCH_SIZE = 10;
        static void Main(string[] args)
        {
            Console.WriteLine("Application started");

            //Create the pipeline of actions
            var transformBlock = new TransformBlock<string, string>(input => TransformString(input), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
            var batchBlock = new BatchBlock<string>(BATCH_SIZE);
            var outputStringsBlock = new ActionBlock<IEnumerable<string>>(strings => OutputStrings(strings), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

            Console.WriteLine("Blocks created");

            //link the actions
            transformBlock.LinkTo(batchBlock, new DataflowLinkOptions { PropagateCompletion = true });
            batchBlock.LinkTo(outputStringsBlock, new DataflowLinkOptions { PropagateCompletion = true });
            batchBlock.Completion.ContinueWith(obj => outputStringsBlock.Complete());

            Console.WriteLine("Blocks linked");

            var testInputs = new List<string>
            {
                "Kyle",
                "Stephen",
                "Jon",
                "Conor",
                "Adrian",
                "Marty",
                "Richard",
                "Norbert",
                "Kerri",
                "Mark",
                "Declan",
                "Ray",
                "Paul",
                "Andrew",
                "Rachel",
                "David",
                "Darrell"
            };

            Console.WriteLine("Data created");

            var i = 0;
            foreach (var name in testInputs)
            {
                Console.WriteLine("Posting name {0}", i);
                transformBlock.Post(name);
                i++;
            }

            transformBlock.Complete();
            outputStringsBlock.Completion.Wait();

            Console.WriteLine("Finishing");
            Console.ReadKey();
        }

        private static void OutputStrings(IEnumerable<string> strings)
        {
            Console.WriteLine("Beginning Batch...");
            Console.WriteLine("");

            foreach (var s in strings)
            {
                Console.WriteLine(s);
            }

            Console.WriteLine("");
            Console.WriteLine("Completing Batch...");
        }

        private static string TransformString(string input)
        {
            return input += " has been processed";
        }
    }
}

      

+2


source







All Articles