Reactive pipeline - how to manage parallelism?
I am building a simple processing pipeline where an item is selected as input, controlled by multiple processors in a sequential manner, and finally output. Below is the general architecture:
The way it is currently working : Pipeline is retrieving items from the provider as quickly as possible. Once an item is retrieved, it is passed to the processors. After processing the item, a notification is displayed. While a single item is processed sequentially, multiple items can be processed in parallel (depending on how quickly they are received from the supplier).
Created and returned from the pipeline IObservable
looks like this:
return Observable.Create<T>(async observer =>
{
while (_provider.HasNext)
{
T item = await _provider.GetNextAsync();
observer.OnNext(item);
}
}).SelectMany(item => Observable.FromAsync(() =>
_processors.Aggregate(
seed: Task.FromResult(item),
func: (current, processor) => current.ContinueWith( // Append continuations.
previous => processor.ProcessAsync(previous.Result))
.Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};
The missing part : I need a control mechanism that controls how many items (max) can be in the pipeline at any given time.
For example, if the maximum parallel processing is 3, this would result in the following workflow:
- Item 1 is retrieved and passed to processors.
- Item 2 is retrieved and passed to the processors.
- Item 3 is retrieved and passed to processors.
- Item 1 processing completed.
- Item 4 is retrieved and passed to the processors.
- Element 3 processing completed.
- Item 5 is retrieved and passed to the processors.
- Etc ...
source to share
Merge
provides an overload that accepts max concurrency .
His signature looks like this: IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);
This is how it would look with your example (I refactored other code as well, which you can take or leave):
return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext,
Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
return _processers.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync));
}))
//Only allow 3 streams to be execute in parallel.
.Merge(3);
To break what it does
-
While
checks each iteration, if_provider.HasNext
is true, if yes then it will re-subscribe to get the next value for_provider
, otherwise it emitsonCompleted
- Inside the selection, a new observable thread is created, but not yet evaluated with
Defer
- The returned item
IObservable<IObservable<T>>
is passed toMerge
, which subscribes to a maximum of 3 observables at the same time. - The inner observable finally evaluates when it's signed.
Alternative 1
If you also need to control the number of concurrent requests that you need is a little more complicated, since you will need to communicate that yours is Observable
ready for new values:
return Observable.Create<T>(observer =>
{
var subject = new Subject<Unit>();
var disposable = new CompositeDisposable(subject);
disposable.Add(subject
//This will complete when provider has run out of values
.TakeWhile(_ => _provider.HasNext)
.SelectMany(
_ => _provider.GetNextAsync(),
(_, item) =>
{
return _processors
.Aggregate(
seed: Observable.Return(item),
func: (current, processor) => current.SelectMany(processor.ProcessAsync))
//Could also use `Finally` here, this signals the chain
//to start on the next item.
.Do(dontCare => {}, () => subject.OnNext(Unit.Default));
}
)
.Merge(3)
.Subscribe(observer));
//Queue up 3 requests for the initial kickoff
disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));
return disposable;
});
source to share
You may need to change the code you posted, but this will be one way to do it:
var eventLoopScheduler = new EventLoopScheduler ();
(from semaphore in Observable.Return(new Semaphore(2,2))
from input in GetInputObs()
from getAccess in Observable.Start(() => semaphore.WaitOne(),eventLoopScheduler)
from output in ProcessInputOnPipeline(input)
.SubscribeOn(Scheduler.Default)
.Finally(() => semaphore.Release())
select output)
.Subscribe(x => Console.WriteLine(x), ex => {});
I modeled your pipeline as 1 observable (which would actually consist of several smaller observables chained together)
The main thing is to make sure that the semaphore is released regardless of how the pipeline ends (Empty / Error), otherwise the thread may hang, and therefore the Release () call on the semaphore is used for the finally () function. (It might be worth considering adding a timeout as well as an observable pipeline if it can never turn off OnComplete () / OnError ().
Edit:
As per the comments below, I've added some scheduling on semaphore access so that we don't block who is pushing those inputs into our thread. I used EventLoopScheduler so that all requests to access the semaphore are queued and executed in 1 thread.
Edit: I prefer Paul's answer though - simple, less scheduling, less synchronization (the merge uses an internal queue).
source to share