Reactive pipeline - how to manage parallelism?

I am building a simple processing pipeline where an item is selected as input, controlled by multiple processors in a sequential manner, and finally output. Below is the general architecture:

rx-pipe

The way it is currently working : Pipeline is retrieving items from the provider as quickly as possible. Once an item is retrieved, it is passed to the processors. After processing the item, a notification is displayed. While a single item is processed sequentially, multiple items can be processed in parallel (depending on how quickly they are received from the supplier).

Created and returned from the pipeline IObservable

looks like this:

return Observable.Create<T>(async observer =>
{
    while (_provider.HasNext)
    {
        T item = await _provider.GetNextAsync();
        observer.OnNext(item);
    }                
}).SelectMany(item => Observable.FromAsync(() =>
    _processors.Aggregate(
        seed: Task.FromResult(item),
        func: (current, processor) => current.ContinueWith( // Append continuations.
            previous => processor.ProcessAsync(previous.Result))
            .Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};

      

The missing part : I need a control mechanism that controls how many items (max) can be in the pipeline at any given time.

For example, if the maximum parallel processing is 3, this would result in the following workflow:

  • Item 1 is retrieved and passed to processors.
  • Item 2 is retrieved and passed to the processors.
  • Item 3 is retrieved and passed to processors.
  • Item 1 processing completed.
  • Item 4 is retrieved and passed to the processors.
  • Element 3 processing completed.
  • Item 5 is retrieved and passed to the processors.
  • Etc ...
+3


source to share


2 answers


Merge

provides an overload that accepts max concurrency .

His signature looks like this: IObservable<T> Merge<T>(this IObservable<IObservable<T>> source, int maxConcurrency);

This is how it would look with your example (I refactored other code as well, which you can take or leave):

return Observable
//Reactive while loop also takes care of the onComplete for you
.While(() => _provider.HasNext, 
       Observable.FromAsync(_provider.GetNextAsync))
//Makes return items that will only execute after subscription
.Select(item => Observable.Defer(() => {
  return _processers.Aggregate(
    seed: Observable.Return(item),
    func: (current, processor) => current.SelectMany(processor.ProcessAsync)); 
  }))
 //Only allow 3 streams to be execute in parallel.
.Merge(3);

      

To break what it does



  • While

    checks each iteration, if _provider.HasNext

    is true, if yes then it will re-subscribe to get the next value for _provider

    , otherwise it emitsonCompleted

  • Inside the selection, a new observable thread is created, but not yet evaluated with Defer

  • The returned item IObservable<IObservable<T>>

    is passed to Merge

    , which subscribes to a maximum of 3 observables at the same time.
  • The inner observable finally evaluates when it's signed.

Alternative 1

If you also need to control the number of concurrent requests that you need is a little more complicated, since you will need to communicate that yours is Observable

ready for new values:

return Observable.Create<T>(observer => 
{
  var subject = new Subject<Unit>();
  var disposable = new CompositeDisposable(subject);

  disposable.Add(subject
    //This will complete when provider has run out of values
    .TakeWhile(_ => _provider.HasNext)
    .SelectMany(
      _ => _provider.GetNextAsync(),
     (_, item) => 
     {
       return _processors
        .Aggregate(
         seed: Observable.Return(item),
         func: (current, processor) => current.SelectMany(processor.ProcessAsync))
        //Could also use `Finally` here, this signals the chain
        //to start on the next item.
        .Do(dontCare => {}, () => subject.OnNext(Unit.Default));
     }
    )
    .Merge(3)
    .Subscribe(observer));

  //Queue up 3 requests for the initial kickoff
  disposable.Add(Observable.Repeat(Unit.Default, 3).Subscribe(subject.OnNext));

  return disposable;
});

      

+3


source


You may need to change the code you posted, but this will be one way to do it:

var eventLoopScheduler = new EventLoopScheduler ();
(from semaphore in Observable.Return(new Semaphore(2,2))
 from input in GetInputObs()
 from getAccess in Observable.Start(() => semaphore.WaitOne(),eventLoopScheduler)
 from output in ProcessInputOnPipeline(input)
        .SubscribeOn(Scheduler.Default) 
        .Finally(() => semaphore.Release())
 select output)
 .Subscribe(x => Console.WriteLine(x), ex => {});

      

I modeled your pipeline as 1 observable (which would actually consist of several smaller observables chained together)

The main thing is to make sure that the semaphore is released regardless of how the pipeline ends (Empty / Error), otherwise the thread may hang, and therefore the Release () call on the semaphore is used for the finally () function. (It might be worth considering adding a timeout as well as an observable pipeline if it can never turn off OnComplete () / OnError ().



Edit:

As per the comments below, I've added some scheduling on semaphore access so that we don't block who is pushing those inputs into our thread. I used EventLoopScheduler so that all requests to access the semaphore are queued and executed in 1 thread.

Edit: I prefer Paul's answer though - simple, less scheduling, less synchronization (the merge uses an internal queue).

+2


source







All Articles