Merge / merge observables
Let's say I am developing a chat application. I observe threads$
that emits an array of threads every n seconds, an observable offline$
that notifies when a thread has become offline, an observable online$
that notifies when a thread has become online:
enum ConnectionStatus { Offline = 0, Online }
interface Thread {
id: string;
status: ConnectionStatus
}
const threads$ = Observable
.interval(n)
.switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
getThreads((threads: Array<Thread>) => observer.next(threads))));
const online$ = Observable.create((observer: Observer<Thread>) =>
onOnline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Online
})));
const offline$ = Observable.create((observer: Observer<Thread>) =>
onOffline((threadId: string) => observer.next({
id: threadId,
status: ConnectionStatus.Offline
})));
I want to combine these streams following this rule: threads$
must allocate an array every n seconds, but whenever online$
or offline$
emits, I want to get the last value ( Array<Threads>
) threads$
and map it, changing the status of one thread and immediately releasing the mapped collection.
I lost track using the Rx combineLatest
, mergeMap
, zip
and the like, so I would be grateful if someone could help me realize the union in this case (in most of the Rx-way)
source to share
This should emit Array<Thread>
every time it threads$
emits and immediately after online$
and offline$
.
const threadUpdate$ = Observable.merge(
threads$,
Observable.merge(online$, offline$)
.withLatestFrom(threads$,
(thread, threads) => threads.map(t => {
if(t.id === thread.id) {
t.status = thread.status
}
})));
Note that threads$
will continue to emit and may even emit, possibly at the same time as the combined flux online$
/ offline$
.
source to share
I think you can do it using multicast()
:
const stop$ = Observable.merge(online$, offline$);
threads$
.multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
.subscribe(...);
I obviously haven't tested it, but maybe it will nudge you in the right direction.
source to share