RxJs Dynamically add events from another EventEmitter

I have an Observable coming from an EventEmitter which is really just a http connection, streaming events.

Sometimes I have to disconnect from the main thread and reconnect. I'm not sure how to handle this with rxjs.

I'm not sure if I can populate the source and then dynamically add another source to the source code, or if I need to do something like what I have at the very bottom.

var Rx = require('rx'),
    EventEmitter = require('events').EventEmitter;

  var eventEmitter = new EventEmitter();
  var eventEmitter2 = new EventEmitter();

  var source = Rx.Observable.fromEvent(eventEmitter, 'data')

  var subscription = source.subscribe(function (data) {
    console.log('data: ' + data);
  });

  setInterval(function() {
    eventEmitter.emit('data', 'foo');
  }, 500);

  // eventEmitter stop emitting data, underlying connection closed
  // now attach seconds eventemitter (new connection)

  // something like this but obvouisly doesn't work
  source
    .fromEvent(eventEmitter2, 'data')

      

The Puesdo code, which is more than what I do, create a second streaming connection before I close the first one, so I don't "lose" any data. Here I am not sure how to stop the Observable without "playing" the records due to the onNext not being called due to the buffer.

  var streams = [], notifiers = [];

  // create initial stream
  createNewStream();

  setInterval(function() {
    if (params of stream have changed) createNewStream();
  }, $1minutes / 3);

  function createNewStream() {
    var stream = new eventEmitterStream();

    stream.once('connected', function() {
      stopOthers();

      streams.push(stream);
      createSource(stream, 'name', 'id');
    });
  }

  function stopOthers() {
    while(streams.length > 0) {
      streams.pop().stop(); // stop the old stream
    }

    while(notifiers.length > 0) {
      // if i call this, the buffer may lose records, before onNext() called
      //notifiers.pop()(Rx.Notification.createOnCompleted());
    }
  }

  function createObserver(tag) {
    return Rx.Observer.create(
      function (x) {
        console.log('Next: ', tag, x.length, x[0], x[x.length-1]);
      },
      function (err) {
        console.log('Error: ', tag, err);
      },
      function () {
        console.log('Completed', tag);
      });
  }

  function createSource(stream, event, id) {
    var source = Rx.Observable
      .fromEvent(stream, event)
      .bufferWithTimeOrCount(time, max);

    var subscription = source.subscribe(createObserver(id));
    var notifier = subscription.toNotifier();
    notifiers.push(notifier);
  }

      

+3


source to share


1 answer


First and formost, you need to make sure that you can remove all listeners from your previously "dead" emitter. Otherwise, you will create a leaky application.

It seems that the only way to know that the EventEmitter has died is to monitor the frequency, unless you have an event that fires on error or termination (for disconnections). The latter is much, much more preferable.

Regardless, Rx Secret Sauce ensures that you create and destroy the data stream in the observable . If you complete the creation of the emitter in the form you are observing, as well as the means to demolish it, you can use amazing things like the operator retry

to recreate that observable.



So, if you have no way of knowing if it died and you want to reconnect it, you can use something like this:

// I'll presume you have some function to get an EventEmitter that 
// is already set up
function getEmitter() {
  var emitter = new EventEmitter();
  setInterval(function(){
    emitter.emit('data', 'foo');
  }, 500)
  return emitter;
}


var emitterObservable = Observable.create(function(observer) {
  // setup the data stream
  var emitter = getEmitter();
  var handler = function(d) {
    observer.onNext(d);
  };
  emitter.on('data', handler);

  return function() {
    // tear down the data stream in your disposal function
    emitter.removeListener('on', handler);
  };
});

// Now you can do Rx magic!
emitterObservable
  // if it doesn't emit in 700ms, throw a timeout error
  .timeout(700)
  // catch all* errors and retry
  // this means the emitter will be torn down and recreated 
  // if it times out!
  .retry()
  // do something with the values
  .subscribe(function(x) { console.log(x); });

      

* NOTE. Retry throws all errors, so you can add catch

above it to handle non-timeout errors. Up to you.

+8


source







All Articles