Javascript observables: needs switchMap functionality but with little difference
I have observed Rxjs ( stream
in the code below), which emits the observed ( subjOne
and subjTwo
). Each of the inner observables can emit their own values ββin any order and at any time. My task is to grab the values ββfrom subjOne until subjTwo emits its first value.
const subjOne = new Subject();
const subjTwo = new Subject();
const stream = Observable.create(observer => {
observer.next(subjOne);
observer.next(subjTwo);
});
stream
.someOperator(subj => subj)
.subscribe(value => console.log('Value: ', value));
Example 1:
subjOne
emits values ββ1 and 2, then subjTwo
emits value 3, then subjOne
emits 4. The output should be: 1, 2, 3.
Example 2:
subjTwo
emits 1, then subjOne
emits 2. The output should be 1.
switchMap is inappropriate here because it emits values ββfrom subjOne
as soon as subjTwo
emitted from stream
. Any ideas on how to achieve this? Thank.
UPDATE: In my actual case, there are not only two internal observables - subjOne
and subjTwo
- but a constant stream of them, so hardcoding by hand is subjOne.takeUntil(subjTwo)
not a viable option.
source to share
I think this does what you want:
// scan to let us keep combining the previous observable
// with the next observable
source
.scan((current, next) => {
// takeUntil to stop current when next produces
const currentUntil = current.takeUntil(next);
// now merge current with next
return currentUntil.merge(next)
}, Rx.Observable.empty())
// switch to the most recent version of the combined inner observables
.switch();
Note that this will only work if the internal observables are hot. If they are cold observables, it will take a little more code to achieve this.
source to share
It looks like you are looking takeUntil
.
takeUntil
listens on one thread until the second thread exits. So for your examples
// Emits every second
const source = Observable.timer(0, 1000);
// Emits once after 3 seconds
const canceller = Observable.timer(3000);
source
// Takes from the source until this stream emits
.takeUntil(canceller)
.subscribe({
next: x => console.log(`Value ${x}`),
complete: () => console.log('Done')
});
// Value 0 -> 0 seconds
// Value 1 -> 1 seconds
// Value 2 -> 2 seconds
// Done -> 3 seconds
source to share
You can output your two members and then combine them with takeUntil
:
const subjOne = new Subject();
const subjTwo = new Subject();
const stream = Observable.create(observer => {
observer.next(subjOne);
observer.next(subjTwo);
});
const first$ = stream.take(1).flatMap(x=>x)
const second$ = stream.skip(1).take(1).flatMap(x=>x)
first$.takeUntil(second$).subscribe(...)
source to share