How to: Implement a Continuous Producer and Consumer Pattern in a Windows Service

Here's what I'm trying to do:

  • Store a queue in memory of the items to be processed (i.e. IsProcessed = 0

    )
  • Every 5 seconds get unprocessed items from the db, and if they are not already in the queue add them
  • Continuously pulling items from the queue, processing them, and each time an item is processed, update it in db ( IsProcessed = 1

    )
  • Make it all "as parallel as possible"

I have a constructor for my service like

public MyService()
{
    Ticker.Elapsed += FillQueue;
}

      

and I start this timer when the service starts like

protected override void OnStart(string[] args)
{
    Ticker.Enabled = true;
    Task.Run(() => { ConsumeWork(); });
}

      

and mine FillQueue

is like

private static async void FillQueue(object source, ElapsedEventArgs e)   
{
    var items = GetUnprocessedItemsFromDb();
    foreach(var item in items)
    {
        if(!Work.Contains(item))
        {
            Work.Enqueue(item);
        }   
    }
}

      

and mine ConsumeWork

is like

private static void ConsumeWork()
{
    while(true)
    {
        if(Work.Count > 0)
        {
            var item = Work.Peek();
            Process(item);
            Work.Dequeue();
        }
        else
        {
            Thread.Sleep(500);
        }
    }
}

      

However, this is probably a naive implementation, and I'm wondering if .NET has any class type that is exactly what I need for this type of situation.

+3


source to share


2 answers


Although the answer @JSteward - a good start, you can improve it by using the mixing TPL-Dataflow

and Rx.NET

the extensions
, because the data flow unit can easily become an observer of your data and Rx

Timer
for you will be much less effort ( Rx.Timer

the explain
).

We can customize the MSDN article to suit your needs, for example:

private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;

var source =
    // sequence of Int64 numbers, starting from 0
    // https://msdn.microsoft.com/en-us/library/hh229435.aspx
    Observable.Timer(
        // fire first event after 1 minute waiting
        TimeSpan.FromSeconds(DueIntervalInSeconds),
        // fire all next events each 5 seconds
        TimeSpan.FromSeconds(EventIntervalInSeconds))
    // each number will have a timestamp
    .Timestamp()
    // each time we select some items to process
    .SelectMany(GetItemsFromDB)
    // filter already added
    .Where(i => !_processedItemIds.Contains(i.Id));

var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
    {
        // we can start as many item processing as processor count
        MaxDegreeOfParallelism = Environment.ProcessorCount,
    });

IDisposable subscription = source.Subscribe(action.AsObserver());

      

Also, your check for an item that has already been processed is not entirely accurate, as there is a possibility that the get item was selected as unprocessed from the db at the moment when you finished processing it, but did not update it in the database. In this case, the item will be removed from Queue<T>

and then added again by the producer, so I added ConcurrentBag<T>

to this solution ( HashSet<T>

not thread safe):

private static async Task ProcessItem(Item item)
{
    if (_processedItemIds.Contains(item.Id))
    {
        return;
    }

    _processedItemIds.Add(item.Id);
    // actual work here

    // save item as processed in database

    // we need to wait to ensure item not to appear in queue again 
    await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));

    // clear the processed cache to reduce memory usage
    _processedItemIds.Remove(item.Id);
}

public class Item
{
    public Guid Id { get; set; }
}

// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();

private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
    // log event timing
    Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");

    // return items from DB
    return new[] { new Item { Id = Guid.NewGuid() } };
}

      



You can implement clearing the cache in another way, for example, start a "GC" timer that will regularly remove processed items from the cache.

To stop events and process items, you must Dispose

subscribe and possibly Complete

ActionBlock

:

subscription.Dispose();
action.Complete();

      

More information Rx.NET

can be found in the github manual .

+3


source


You can use ActionBlock

to do your processing, it has a built-in queue that you can submit work to. You can read the tpl-dataflow here: Injecting TPL Streams also Introduction to Dataflow Part 1 . Finally, this is a quick reference to help you move. I've forgotten a lot, but at least you need to get started.



using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace MyWorkProcessor {

    public class WorkProcessor {

        public WorkProcessor() {
            Processor = CreatePipeline();
        }    

        public async Task StartProcessing() {
            try {
                await Task.Run(() => GetWorkFromDatabase());
            } catch (OperationCanceledException) {
                //handle cancel
            }
        }

        private CancellationTokenSource cts {
            get;
            set;
        }

        private ITargetBlock<WorkItem> Processor {
            get;
        }

        private TimeSpan DatabasePollingFrequency {
            get;
        } = TimeSpan.FromSeconds(5);

        private ITargetBlock<WorkItem> CreatePipeline() {
            var options = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = 100,
                CancellationToken = cts.Token
            };
            return new ActionBlock<WorkItem>(item => ProcessWork(item), options);
        }

        private async Task GetWorkFromDatabase() {
            while (!cts.IsCancellationRequested) {
                var work = await GetWork();
                await Processor.SendAsync(work);
                await Task.Delay(DatabasePollingFrequency);
            }
        }

        private async Task<WorkItem> GetWork() {
            return await Context.GetWork();
        }

        private void ProcessWork(WorkItem item) {
            //do processing
        }
    }
}

      

0


source







All Articles