Ensure the order of updating subscribers

Is there a way to ensure that the order for updating subscribers is ensured?

I have a hot watcher and my first subscriber does some synchronization to update a variable, and my next subscriber then has to initialize the service (just once!) And only after that variable has been set!

it looks like this:

import App from './App'

var appSource = App.init() // gets the hot observable

// our second subscriber
appSource.take(1).subscribe(() => {
  // take 1 to only run this once
  nextService.init()
})

      

where App.init

looks like this:

...
init() {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => this.myVar = data)

  return source
}
...

      

This currently works, but I'm not sure if it will always follow the 100% order.

EDIT:

As I heard callers will be called FIFO. Thus, the order is somewhat guaranteed.

+3


source to share


1 answer


I don't know if RxJS always makes sure that observers are called in the order they subscribe. But as you say, it usually works.

However, you might consider modeling your actual workflow instead of relying on the implicit order of the observers.

It looks like you need to know when your application will be initialized so that you can take further action. Instead of relying on knowledge of internal operations App.init

, one App

can expose an API to do this:

One (non-Rx way) is to provide the caller with a callback init

:

//...
init(callback) {
  var source = this.createObservable() // returns a hot interval observable that fetches a resource every few minutes

  // first subscriber, updates the `myVar` every few minutes
  source.subscribe((data) => {
    this.myVar = data;
    if (callback) {
        callback();
        callback = undefined;
    }
  })

  return source
}

// elsewhere
App.init(() => nextService.init());

      



Another option instead of a callback is to simply init

return a Promise

, which is your solution (or Rx.AsyncSubject

which you signal) after the initialization is complete.

Another option, but requires a bit of refactoring, is to model this.myVar

as observable data. i.e:.

init() {
    this.myVar = this.createObservable().replay(1);
    this.myVar.connect();
    // returns an observable that signals when we are initialized
    return this.myVar.first();
}

// elsewhere, you end up with this pattern...
const servicesToInit = [ App, service1, service2, service3 ];
Observable
    .of(servicesToInit)
    .concatMap(s => Rx.Observable.defer(() => s.init()))
    .toArray()
    .subscribe(results => {
        // all initializations complete
        // results is an array containing the value returned by each service init observable
    });

      

Now, whatever wants to use myVar

should always subscribe to it to get current and / or future values. They could never simply query the current value synchronously.

+2


source







All Articles