Understanding data streams and multiple subscribers (using modification)

Let's say I have 2 observables (A and B) which are essentially network calls (using Retrofit for context).

The current application thread looks like this:

  • A and B start at about the same time (asynchronously).
  • B is executed 0 or more times on user interaction

I have 3 different scripts that I want to listen to for these two obsables / api calls.

  • I want to know right away when Observable A completes
  • I want to know right away when Observable B completes
  • I want to know when both finished

First, is this a good use case for RxJava?

I know how to do each scenario individually (using zip

for the latter), although I don't know how to do all of them at the same time.

If I subscribe to Observable A, A starts. If I subscribe to B, B starts. If A and B are completed before I subscribe to zip (a, b), I may miss that event and never see it in full. right?

Any general guidance would be appreciated. The knowledge of RxJava is quite subtle: P

+3


source to share


1 answer


You can achieve this by using three different observables, one for each of your cases.

As you will have to share states between each observable, you will have to convert the modified cold observables to hot observables. (see here for more information on this topic)



ConnectableObservable a = service.callA().publish(); 
ConnectableObservable b = service.callB().publish();

a.subscribe((e) -> { /* onNext */ }, (ex) -> {/* onError */},  () -> {/* when A is completed */ });
b.subscribe((e) -> { /* onNext */ }, (ex) -> {/* onError */},  () -> {/* when B is completed */ });
a.mergeWith(b).subscribe((e) -> { /* onNext */ }, (ex) -> {/* onError */},  () -> {/* when A and B are completed */ });

a.connect(); // start subscription to a
b.connect(); // start subscription to b

      

Do not share the object between onCompleted methods or you will have to deal with matching problems.

+5


source







All Articles