RxJava: like .zip two Observables, then .merge them and eventually.

I have the following code:

public void foo() {
    Long[] gData = new Long[] { 1L, 2L };

    rx.Observable.from(gData)
    .concatMap(data -> {

        rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1);
        rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2);


        return rx.Observable.zip(depositObs1, depositObs2, (depositObj1, depositObj2) -> {

            depositObj1.putNumber("seat_index", data);
            depositObj2.putNumber("seat_index", data);

            return rx.Observable.merge(
                    rx.Observable.just(depositObj1),
                    rx.Observable.just(depositObj2));
        })
    })
    .reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {

        int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
        long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
        payoutArr.add(seatIndex, payout);
        return payoutArr;
    })
    .subscribe(results -> {
        System.out.println(results);
    });
}

      

This code uses .zip to emit for observables and then adds "seat_index" property and calls .merge to use .reduce, so eventually all results will be aggregated into ArrayList.

There is a problem with this code: When .reduce processes its input, it receives it as an Observable, not as a GmObject ... What function can "extract" a GmObject from its Observable wrap?

Does it make sense to use rxJava this way? or is there a better technique?

Thank!

+3


source to share


1 answer


the operator zip

takes a lambda as its third argument. this lambda is a 2 args function that returns an object that is the result of the composition args. And not the Observable

result of the composition (but, of course, the object can be Observable

, but this is not what you want in your case).

So after your call zip

, you will have Observable<Observable<GmObject>>

, but you expect Observable<GmObject>

.



I don't think operator zip

is the operator you are looking for.

public void foo() {
    Long[] gData = new Long[] { 1L, 2L };

    rx.Observable.from(gData)
    .concatMap(data -> {

        rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1).doOnNext(obj -> obj.putNumber("seat_index", data));
        rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2).doOnNext(obj -> obj.putNumber("seat_index", data));


        return rx.Observable.merge(depositObs1, depositObs2);
    })
   .reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {

        int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
        long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
        payoutArr.add(seatIndex, payout);
        return payoutArr;
   })
   .subscribe(results -> System.out.println(results));
}

      

+5


source







All Articles