Subscribe to observable with rx-angular

I would like to use angular-rx for a simple refresh button for results. If the user clicks the refresh button, the results will be reloaded. If the user clicks the refresh button 100 times in 1 second, only the most recent results are downloaded. If for some reason the results fail, this does not mean that the refresh button stops working.

To reach the last point, I would like to keep the subscription (or redirect) even if it failed, but I cannot work out how to do this?

It won't work, but here's a simple example where I'm trying to re-subscribe to an error:

var refreshObs = $scope.$createObservableFunction('refresh');

var doSubscribe = function () {
  refreshObs
  .select(function (x, idx, obs) {
      // get the results.
      // in here might throw an exception
  })
  .switch()
  .subscribe(
  function (x) { /*show the results*/ }, // on next
  function (err) { // on error
      doSubscribe(); // re-subscribe
  },
  function () { } // on complete
  );
};
doSubscribe();

      

I suppose this is common enough that there must be some standard practice to achieve this?

UPDATE

Using the suggested solution, this is what I did for testing:

// using angularjs and the rx.lite.js library
var testCount = 0;
var obsSubject = new rx.Subject(); // note. rx is injected but is really Rx
$scope.refreshButton = function () { // click runs this
  obsSubject.onNext();
};

obsSubject.map(function () {
  testCount++;
  if (testCount % 2 === 0) {
      throw new Error("something to catch");
  }
  return 1;
})
.catch(function (e) {
  return rx.Observable.return(1);
})
.subscribe(
    function (x) {
    // do something with results
});

      

And these are my test results:

  • Refresh button pressed
  • obsSubject.onNext () is called
  • The map function returns 1.
  • subscribe onNext fired
  • Refresh button pressed
  • obsSubject.onNext () is called
  • Map function display error.
  • introduces catch function
  • subscribe onNext fired
  • Refresh button pressed
  • obsSubject.onNext () is called
  • Nothing. I need to keep my subscription

My understanding is that the catch should support subscriptions, but my testing indicates that it is not. Why?

+3


source to share


1 answer


Based on the context provided in your comment, you want:

  • Each refresh button to trigger a "get results"
  • Every error displayed to the user

You don't really need to rewrite, this is an anti-pattern, because the code in Rx never depends on it, and the extra recursive call just confuses the reader. It also reminds us of the return of hell.

In this case, you must:

  • Remove calls to doSubscribe () because you don't need them. With this code, you already have a behavior that every update click causes new "fetch results".
  • Replace select().switch()

    with .flatMap()

    (or .flatMapLatest()

    ). When you execute select()

    , the result is a meta-stream (stream of streams) and you use switch()

    to flatten the meta-stream into a stream. That's all flatMap does, but in just one operation. You can also understand flatMap as .then()

    for JS Promises.
  • Include a statement .catch()

    that will handle your error like in a block catch

    . The reason you can't get more results after an error is because an Observable always ends up with an error or a "complete" event. With the operator, catch()

    we can replace errors with normal events with an Observable so that it continues.

To improve the code:



var refreshObs = $scope.$createObservableFunction('refresh');

refreshObs
  .flatMapLatest(function (x, idx, obs) {
    // get the results.
    // in here might throw an exception
    // should return an Observable of the results
  })
  .catch(function(e) {
      // do something with the error
      return Rx.Observable.empty(); // replace the error with nothing
  })
  .subscribe(function (x) { 
      // on next
  });

      

Note that I removed the onError and onComplete handlers as there is nothing to do inside them.

Also take a look at more operators. For example, it retry()

can be used to automatically retrieve results every time an error occurs. See https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/retry.md

Use retry()

in conjunction with do()

to handle the error ( do

) and allow the subscriber to automatically re-subscribe to the observable source ( retry

).

refreshObs
  .flatMapLatest(function (x, idx, obs) {
    // get the results.
    // in here might throw an exception
    // should return an Observable of the results
  })
  .do(function(){}, // noop for onNext
  function(e) {
      // do something with the error
  })
  .retry()
  .subscribe(function (x) { 
      // on next
  });

      

See a working example here: http://jsfiddle.net/staltz/9wd13gp9/9/

+5


source







All Articles