RxJava: like .zip two Observables, then .merge them and eventually.
I have the following code:
public void foo() {
Long[] gData = new Long[] { 1L, 2L };
rx.Observable.from(gData)
.concatMap(data -> {
rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1);
rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2);
return rx.Observable.zip(depositObs1, depositObs2, (depositObj1, depositObj2) -> {
depositObj1.putNumber("seat_index", data);
depositObj2.putNumber("seat_index", data);
return rx.Observable.merge(
rx.Observable.just(depositObj1),
rx.Observable.just(depositObj2));
})
})
.reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {
int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
payoutArr.add(seatIndex, payout);
return payoutArr;
})
.subscribe(results -> {
System.out.println(results);
});
}
This code uses .zip to emit for observables and then adds "seat_index" property and calls .merge to use .reduce, so eventually all results will be aggregated into ArrayList.
There is a problem with this code: When .reduce processes its input, it receives it as an Observable, not as a GmObject ... What function can "extract" a GmObject from its Observable wrap?
Does it make sense to use rxJava this way? or is there a better technique?
Thank!
source to share
the operator zip
takes a lambda as its third argument. this lambda is a 2 args function that returns an object that is the result of the composition args. And not the Observable
result of the composition (but, of course, the object can be Observable
, but this is not what you want in your case).
So after your call zip
, you will have Observable<Observable<GmObject>>
, but you expect Observable<GmObject>
.
I don't think operator zip
is the operator you are looking for.
public void foo() {
Long[] gData = new Long[] { 1L, 2L };
rx.Observable.from(gData)
.concatMap(data -> {
rx.Observable<GmObject> depositObs1 = depositToUserBalance(data, 1).doOnNext(obj -> obj.putNumber("seat_index", data));
rx.Observable<GmObject> depositObs2 = depositToUserBalance(data, 2).doOnNext(obj -> obj.putNumber("seat_index", data));
return rx.Observable.merge(depositObs1, depositObs2);
})
.reduce(new ArrayList<Long>(), (payoutArr, payoutObj) -> {
int seatIndex = ((GmObject) payoutObj).getNumber("seat_index").intValue();
long payout = ((GmObject) payoutObj).getNumber("payout").longValue();
payoutArr.add(seatIndex, payout);
return payoutArr;
})
.subscribe(results -> System.out.println(results));
}
source to share