[Rx.js] when does the Zip operator emits an error?

I am learning Rx.js and have one operator problem zip

:

    var error =Rx.Observable.throw('Oop!');
    var age$ = Rx.Observable.concat(Rx.Observable.of(21,22,23),error);
    var sex$ = Rx.Observable.of("male","male","female","female");
    var name$ = Rx.Observable.of("jack","john","james","lucy");
    var example = Rx.Observable.zip(age$,sex$,name$,(age,sex,name)=>{ return {age,sex,name} });

      

and I subscribe to the source example

and print some message:

    example.subscribe({
        next: (value) => { console.log(value); },
        error: (err) => { console.log('Error: ' + err); },
        complete: () => { console.log('complete'); }
    });

      

the output is not what I expected:

{age:21,sex:"male",name:"jack"}
{age:22,sex:"male",name:"john"}
{age:23,sex:"female",name:"james"}
error

      

but only one line with no value output

:

    error

      

read the official doc , but no chapter explains when an operator zip

emits error

.

Can anyone please help? thank you very much.

+3


source to share


1 answer


You see the error right away because the first visible one you pass emits its values ​​synchronously. (The other observables also emit their values ​​synchronously, but that doesn't matter in this scenario.)

zip

subscribes to the traversed observables one by one and in the order in which they are transmitted. After subscribing to the first observable, it zip

synchronously accepts all observables and the concatenated error. Then it emits its own error and gets executed.

If you specify an optional argument to the scheduler - so that observables emit their values ​​asynchronously - you see the behavior you expected:



var age$ = Rx.Observable.concat(
  Rx.Observable.of(21, 22, 23, Rx.Scheduler.async),
  Rx.Observable.throw("Oop!", Rx.Scheduler.async)
);

var sex$ = Rx.Observable.of(
  "male", "male", "female", "female",
  Rx.Scheduler.async
);

var name$ = Rx.Observable.of(
  "jack", "john", "james", "lucy",
  Rx.Scheduler.async
);

var zipped$ = Rx.Observable.zip(
  age$, sex$, name$,
  (age, sex, name) => ({ age, sex, name })
);
zipped$.subscribe(
  (value) => console.log(value),
  (error) => console.log(error),
  () => console.log("complete")
);
      

.as-console-wrapper { max-height: 100% !important; top: 0; }
      

<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>
      

Run codeHide result


+1


source







All Articles