Merge / merge observables

Let's say I am developing a chat application. I observe threads$

that emits an array of threads every n seconds, an observable offline$

that notifies when a thread has become offline, an observable online$

that notifies when a thread has become online:

enum ConnectionStatus { Offline = 0, Online }

interface Thread {
    id: string;
    status: ConnectionStatus
}

const threads$ = Observable
    .interval(n)
    .switchMap(() => Observable.create((observer: Observer<Array<Thread>>) =>
        getThreads((threads: Array<Thread>) => observer.next(threads))));

const online$ = Observable.create((observer: Observer<Thread>) =>
    onOnline((threadId: string) => observer.next({
        id: threadId,
        status: ConnectionStatus.Online
    })));

const offline$ = Observable.create((observer: Observer<Thread>) =>
    onOffline((threadId: string) => observer.next({
        id: threadId,
        status: ConnectionStatus.Offline
    })));

      

I want to combine these streams following this rule: threads$

must allocate an array every n seconds, but whenever online$

or offline$

emits, I want to get the last value ( Array<Threads>

) threads$

and map it, changing the status of one thread and immediately releasing the mapped collection.

I lost track using the Rx combineLatest

, mergeMap

, zip

and the like, so I would be grateful if someone could help me realize the union in this case (in most of the Rx-way)

+3


source to share


2 answers


This should emit Array<Thread>

every time it threads$

emits and immediately after online$

and offline$

.

const threadUpdate$ = Observable.merge(
    threads$,
    Observable.merge(online$, offline$)
        .withLatestFrom(threads$,
            (thread, threads) => threads.map(t => {
                if(t.id === thread.id) {
                    t.status = thread.status
                }
            })));

      



Note that threads$

will continue to emit and may even emit, possibly at the same time as the combined flux online$

/ offline$

.

0


source


I think you can do it using multicast()

:

const stop$ = Observable.merge(online$, offline$);
threads$
    .multicast(new Subject(), obs => Observable.merge(obs, obs.takeUntil(stop$).takeLast(1).map(...)))
    .subscribe(...);

      



I obviously haven't tested it, but maybe it will nudge you in the right direction.

+1


source







All Articles