Handle ConcurrentStack when not empty?

I have a ConcurrentStack that I am throwing items into. What's a good way to handle these items one at a time when the stack isn't empty? I would like to do it in such a way as not to overwhelm the CPU cycles when the stack is not being processed.

What I've gotten now is basically this and it doesn't seem like a perfect solution.

private void AddToStack(MyObj obj)
{
    stack.Push(obj);
    HandleStack();
}

private void HandleStack()
{
    if (handling)
        return;

    Task.Run( () =>
    {
        lock (lockObj)
        {
            handling = true;
            if (stack.Any())
            {
                //handle whatever is on top of the stack
            }
            handling = false;
        }
    }
}

      

This way the bool exists, so multiple threads don't get backed up while waiting for a lock. But I don't want multiple things to handle the stack at once, hence the lock. So if two separate threads end up calling HandleStack at the same time and passing the bool, the lock exists, so they don't both iterate through the stack at once. But as soon as the second one goes through the lock, the stack will be empty and will do nothing. So this ultimately gives me the behavior I want.

So really, I'm just writing a pseudo-parallel wrapper around the ConcurrentStack, and it looks like it requires a different way. Thoughts?

+3


source to share


3 answers


ConcurrentStack<T>

is one of the collections that implements IProducerConsumerCollection<T>

, and as such can be wrapped BlockingCollection<T>

. BlockingCollection<T>

has several convenience members for common operations such as "consume while the stack is not empty". For example, you can call TryTake

in a loop. Or you can just use GetConsumingEnumerable

:



private BlockingCollection<MyObj> stack;
private Task consumer;

Constructor()
{
  stack = new BlockingCollection<MyObj>(new ConcurrentStack<MyObj>());
  consumer = Task.Run(() =>
  {
    foreach (var myObj in stack.GetConsumingEnumerable())
    {
      ...
    }
  });
}

private void AddToStack(MyObj obj)
{
  stack.Add(obj);
}

      

+2


source


You can use Microsoft TPL Dataflow to do this.

Here's a simple example showing how to create a queue. Try and play with the settings MaxDegreeOfParallelism

and BoundedCapacity

to see what happens.

In your example, I think you need to set MaxDegreeOfParallelism

to 1 if you don't want multiple threads to process an item at the same time.



(Note: you need to use .Net 4.5x and set up TPL dataflow for the project using Nuget.)

Also read Stephen Cleary's blog on TPL .

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace SimpleTPL
{
    class MyObj
    {
        public MyObj(string data)
        {
            Data = data;
        }

        public readonly string Data;
    }

    class Program
    {
        static void Main()
        {
            var queue = new ActionBlock<MyObj>(data => process(data), actionBlockOptions());
            var task = queueData(queue);

            Console.WriteLine("Waiting for task to complete.");
            task.Wait();
            Console.WriteLine("Completed.");
        }

        private static void process(MyObj data)
        {
            Console.WriteLine("Processing data " + data.Data);
            Thread.Sleep(200); // Simulate load.
        }

        private static async Task queueData(ActionBlock<MyObj> executor)
        {
            for (int i = 0; i < 20; ++i)
            {
                Console.WriteLine("Queuing data " + i);
                MyObj data = new MyObj(i.ToString());

                await executor.SendAsync(data);
            }

            Console.WriteLine("Indicating that no more data will be queued.");

            executor.Complete(); // Indicate that no more items will be queued.

            Console.WriteLine("Waiting for queue to empty.");

            await executor.Completion; // Wait for executor queue to empty.
        }

        private static ExecutionDataflowBlockOptions actionBlockOptions()
        {
            return new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,
                BoundedCapacity        = 8
            };
        }
    }
}

      

+3


source


Sounds like you want a typical consumer-producer.

I would recommend using autoresetevent

Ask your user to wait when the stack is empty. Call Set when calling producer method.

Read this thread

Fast and Best BlockingCollection Producer / Consumer Technique vs Parallel Queue

+2


source







All Articles