How to: Implement a Continuous Producer and Consumer Pattern in a Windows Service
Here's what I'm trying to do:
- Store a queue in memory of the items to be processed (i.e.
IsProcessed = 0
) - Every 5 seconds get unprocessed items from the db, and if they are not already in the queue add them
- Continuously pulling items from the queue, processing them, and each time an item is processed, update it in db (
IsProcessed = 1
) - Make it all "as parallel as possible"
I have a constructor for my service like
public MyService()
{
Ticker.Elapsed += FillQueue;
}
and I start this timer when the service starts like
protected override void OnStart(string[] args)
{
Ticker.Enabled = true;
Task.Run(() => { ConsumeWork(); });
}
and mine FillQueue
is like
private static async void FillQueue(object source, ElapsedEventArgs e)
{
var items = GetUnprocessedItemsFromDb();
foreach(var item in items)
{
if(!Work.Contains(item))
{
Work.Enqueue(item);
}
}
}
and mine ConsumeWork
is like
private static void ConsumeWork()
{
while(true)
{
if(Work.Count > 0)
{
var item = Work.Peek();
Process(item);
Work.Dequeue();
}
else
{
Thread.Sleep(500);
}
}
}
However, this is probably a naive implementation, and I'm wondering if .NET has any class type that is exactly what I need for this type of situation.
source to share
Although the answer @JSteward - a good start, you can improve it by using the mixing TPL-Dataflow
and Rx.NET
the extensions , because the data flow unit can easily become an observer of your data and Rx
Timer for you will be much less effort ( Rx.Timer
the explain ).
We can customize the MSDN article to suit your needs, for example:
private const int EventIntervalInSeconds = 5;
private const int DueIntervalInSeconds = 60;
var source =
// sequence of Int64 numbers, starting from 0
// https://msdn.microsoft.com/en-us/library/hh229435.aspx
Observable.Timer(
// fire first event after 1 minute waiting
TimeSpan.FromSeconds(DueIntervalInSeconds),
// fire all next events each 5 seconds
TimeSpan.FromSeconds(EventIntervalInSeconds))
// each number will have a timestamp
.Timestamp()
// each time we select some items to process
.SelectMany(GetItemsFromDB)
// filter already added
.Where(i => !_processedItemIds.Contains(i.Id));
var action = new ActionBlock<Item>(ProcessItem, new ExecutionDataflowBlockOptions
{
// we can start as many item processing as processor count
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
IDisposable subscription = source.Subscribe(action.AsObserver());
Also, your check for an item that has already been processed is not entirely accurate, as there is a possibility that the get item was selected as unprocessed from the db at the moment when you finished processing it, but did not update it in the database. In this case, the item will be removed from Queue<T>
and then added again by the producer, so I added ConcurrentBag<T>
to this solution ( HashSet<T>
not thread safe):
private static async Task ProcessItem(Item item)
{
if (_processedItemIds.Contains(item.Id))
{
return;
}
_processedItemIds.Add(item.Id);
// actual work here
// save item as processed in database
// we need to wait to ensure item not to appear in queue again
await Task.Delay(TimeSpan.FromSeconds(EventIntervalInSeconds * 2));
// clear the processed cache to reduce memory usage
_processedItemIds.Remove(item.Id);
}
public class Item
{
public Guid Id { get; set; }
}
// temporary cache for items in process
private static ConcurrentBag<Guid> _processedItemIds = new ConcurrentBag<Guid>();
private static IEnumerable<Item> GetItemsFromDB(Timestamped<long> time)
{
// log event timing
Console.WriteLine($"Event # {time.Value} at {time.Timestamp}");
// return items from DB
return new[] { new Item { Id = Guid.NewGuid() } };
}
You can implement clearing the cache in another way, for example, start a "GC" timer that will regularly remove processed items from the cache.
To stop events and process items, you must Dispose
subscribe and possibly Complete
ActionBlock
:
subscription.Dispose();
action.Complete();
More information Rx.NET
can be found in the github manual .
source to share
You can use ActionBlock
to do your processing, it has a built-in queue that you can submit work to. You can read the tpl-dataflow here: Injecting TPL Streams also Introduction to Dataflow Part 1 . Finally, this is a quick reference to help you move. I've forgotten a lot, but at least you need to get started.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace MyWorkProcessor {
public class WorkProcessor {
public WorkProcessor() {
Processor = CreatePipeline();
}
public async Task StartProcessing() {
try {
await Task.Run(() => GetWorkFromDatabase());
} catch (OperationCanceledException) {
//handle cancel
}
}
private CancellationTokenSource cts {
get;
set;
}
private ITargetBlock<WorkItem> Processor {
get;
}
private TimeSpan DatabasePollingFrequency {
get;
} = TimeSpan.FromSeconds(5);
private ITargetBlock<WorkItem> CreatePipeline() {
var options = new ExecutionDataflowBlockOptions() {
BoundedCapacity = 100,
CancellationToken = cts.Token
};
return new ActionBlock<WorkItem>(item => ProcessWork(item), options);
}
private async Task GetWorkFromDatabase() {
while (!cts.IsCancellationRequested) {
var work = await GetWork();
await Processor.SendAsync(work);
await Task.Delay(DatabasePollingFrequency);
}
}
private async Task<WorkItem> GetWork() {
return await Context.GetWork();
}
private void ProcessWork(WorkItem item) {
//do processing
}
}
}
source to share