How to balance read and write streams in .NET.

Problem: Data to be read is accumulating while waiting to be written.

I have a basic ETL process that reads in a file, transforms data, and then writes data to another file. Since I am on a multi-core system, I am trying to accomplish this using multiple threads. My problem is that readers are ahead of authors: many files end up being read and their data gets transformed, but they accumulate waiting to be written.

What I want is a balance between files read and files written, but it uses multiple threads.

I have tried different things in the .NET library (C # 4.0). I think there is something that I don't understand and that it must be more difficult than just using Thread

or ThreadPool.QueueUserWorkItem

or Task

how they appear in the basic examples I found.

For example, let's say I try something like this:

Task task = new Task(() => PerformEtl(sourceFile));
task.start();

      

If I log files read and files written, it looks like a 10 to 1 ratio. In the long run, this is unstable.

There must be some basic multithreading / multiprocessing pattern that I don't know or cannot remember. Does anyone know where to go? Thank.


Solved:

Thanks to @Blam.

Here is an example / pseudo code to illustrate how the producer-consumer pattern can be implemented using the .NET library as suggested by @Blam.

// Adapted from: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx
BlockingCollection<object> dataItems = new BlockingCollection<object>(10);
List<Task> tasks = new List<Task>();

tasks.Add(
    // Producer.
    Task.Factory.StartNew(() =>
    {
        for (;;)
        {
            string filePath = GetNextFile();
            if (filePath == null) break;

            object data = ProcessData(ReadData(file));
            dataItems.Add(data);
        }

        dataItems.CompleteAdding();
    })
);

tasks.Add(
    // Consumer.
    Task.Factory.StartNew(() =>
    {
        while (!dataItems.IsCompleted))
        {
            object data;

            try
            {
                data = dataItems.Take();
                WriteData(data);
            }
            catch(InvalidOperationException ioe)
            {
                Console.Error.WriteLine(ioe.Message);
            }
        }
    })
);

Task.WaitAll(tasks.ToArray());

      

The MSDN discussion is here: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx

+3


source to share


1 answer


I do exactly what I break into 3

  • Reading
    There is only one set of heads - doing it in parallel is no good. Close the file and pass the text to the next step.
  • Process
  • Recording

Use BlockingCollection with top (limited capacity)
With Upperbound, fast steps don't come close to slow



So you have multiple cores. You are probably related to IO.

You can process (step 2) in parallel, but if you don't have complex transformations this won't make a difference.

Try reading and writing on different physical devices.

+2


source







All Articles