Javascript observables: needs switchMap functionality but with little difference

I have observed Rxjs ( stream

in the code below), which emits the observed ( subjOne

and subjTwo

). Each of the inner observables can emit their own values ​​in any order and at any time. My task is to grab the values ​​from subjOne until subjTwo emits its first value.

const subjOne = new Subject();
const subjTwo = new Subject();

const stream = Observable.create(observer => {
    observer.next(subjOne);
    observer.next(subjTwo);
});

stream
    .someOperator(subj => subj)
    .subscribe(value => console.log('Value: ', value));

      

Example 1: subjOne

emits values ​​1 and 2, then subjTwo

emits value 3, then subjOne

emits 4. The output should be: 1, 2, 3.

Example 2: subjTwo

emits 1, then subjOne

emits 2. The output should be 1.

switchMap is inappropriate here because it emits values ​​from subjOne

as soon as subjTwo

emitted from stream

. Any ideas on how to achieve this? Thank.

UPDATE: In my actual case, there are not only two internal observables - subjOne

and subjTwo

- but a constant stream of them, so hardcoding by hand is subjOne.takeUntil(subjTwo)

not a viable option.

+3


source to share


3 answers


I think this does what you want:

// scan to let us keep combining the previous observable
// with the next observable
source
  .scan((current, next) => {
    // takeUntil to stop current when next produces
    const currentUntil = current.takeUntil(next);
    // now merge current with next
    return currentUntil.merge(next)
  }, Rx.Observable.empty())
  // switch to the most recent version of the combined inner observables
  .switch();

      



Note that this will only work if the internal observables are hot. If they are cold observables, it will take a little more code to achieve this.

+3


source


It looks like you are looking takeUntil

.

takeUntil

listens on one thread until the second thread exits. So for your examples



// Emits every second
const source = Observable.timer(0, 1000);

// Emits once after 3 seconds
const canceller = Observable.timer(3000);

source
  // Takes from the source until this stream emits
  .takeUntil(canceller)
  .subscribe({
    next: x => console.log(`Value ${x}`),
    complete: () => console.log('Done')
  });
// Value 0 -> 0 seconds
// Value 1 -> 1 seconds
// Value 2 -> 2 seconds
// Done -> 3 seconds

      

0


source


You can output your two members and then combine them with takeUntil

:

const subjOne = new Subject();
const subjTwo = new Subject();

const stream = Observable.create(observer => {
    observer.next(subjOne);
    observer.next(subjTwo);
});

const first$ = stream.take(1).flatMap(x=>x)
const second$ = stream.skip(1).take(1).flatMap(x=>x)

first$.takeUntil(second$).subscribe(...)

      

0


source







All Articles