RxJS how to ignore catch error and keep going

Hi I have the following code and I would like to know how to prevent the main (upstream) Observable from being thrown when removed on error.

How can I change the following code so that all digits expect "4" are displayed?

I'm looking for a generic templating solution that will work in other cases with different operators. This is the simplest case I could think of.

const Rx = require('rxjs/Rx');

function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source.filter(x => checkValue(x))
  .catch(err => Rx.Observable.empty())
  .subscribe(v => console.log(v));

      

+3


source to share


2 answers


You want to keep the observable, but if you make a mistake in the main flow of events, it will crash everything observable and you will no longer receive items.

The solution involves creating a split stream where you can filter and trap without allowing the upstream to stall.

const Rx = require('rxjs/Rx');
function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source
  // pass the item into the projection function of the switchMap operator
  .switchMap(x => {
     // we create a new stream of just one item
     // this stream is created for every item emitted by the source observable
     return Observable.of(x)
       // now we run the filter
       .filter(checkValue)
       // we catch the error here within the projection function
       // on error this upstream pipe will collapse, but that is ok because it starts within this function and will not effect the source
       // the downstream operators will never see the error so they will also not be effect
       .catch(err => Rx.Observable.empty());
     })
     .subscribe(v => console.log(v));

      



You can also use the second argument passed to the catch selector to restart the observable source, but this will start it as if it hadn't been executed before.

const Rx = require('rxjs/Rx');

function checkValue(n) {
  if(n === 4) {
    throw new Error("Bad value");
  }
  return true;
}
const source = Rx.Observable.interval(100).take(10);

source.filter(x => checkValue(x))
  .catch((err, source) => source)
  .subscribe(v => console.log(v));

      

But this does not give the desired effect. You will see a stream emitting 1..3 several times before the end of time ... or you exit the script. Which comes first. (it matters what it does .retry()

)

+6


source


You need to use the flatMap operator where you will be filtering. In flatMap in this example, I am using Observable.if () for filtering, as this ensures me that I am returning observables all the time. I'm sure you can do it differently, but for me this is a pure implementation.



const source = Rx.Observable.interval(100).take(10).flatMap((x)=>
    Rx.Observable.if(() => x !== 4, 
    Rx.Observable.of(x),
    Rx.Observable.throw("Bad value"))
    .catch((err) => {
        return Rx.Observable.empty()
    })
);

source.subscribe(v => console.log(v));

      

0


source







All Articles