RxJava - zip list of observables
I have a list of Observables (RxJava 1).
List<Observable> observableList = new ArrayList<>();
It can contain at least 1 observable. Each of them has the same result type.
How can I consolidate the results of all observables?
I was thinking about the zip-operator, but it doesn't support List and I don't know the number of observables (it could be 1,2,3,4 ....)
source to share
You can use a static method . zip(java.lang.Iterable<? extends Observable<?>> ws,FuncN<? extends R> zipFunction)
It's a zip
method that takes Iterable
of Observable
and (which takes a varargs parameter for its method ) and uses it to combine the corresponding emitted into a result that will be omitted by the new return . FuncN
call
Object
Observable
So, for example, you could do:
Observable.zip(observableList, new FuncN(){
public ReturnType call(java.lang.Object... args){
ReturnType result; //to be made
//preparatory code for using the args
for (Object obj : args){
ReturnType retObj = (ReturnType)obj;
//code to use the arg once at a time to combine N of them into one.
}
return result;
}
});
source to share
ReactiveX - Zip operator
Zip outside BiFunction
Zip concatenates the outliers of multiple observables together through a given function and highlights separate elements for each combination based on the results of that function
Here list is an array of lists of observable types that you want to pass.
val list = arrayListOf<Observable<ImageUrlResponse>>()
Observable.zip(list) { args -> Arrays.asList(args) }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
val response = it[0]
val imageUrlResponse = imageUrlObject as ImageUrlResponse
urls.add(imageUrlResponse.imageUrl)}
}, {
val c = it
})
The result of the next subscription is this image below. Just as we expect this to be packaged together . Also you may notice that it returns all responses, which will be zipped into one java.lang.Object [] .
Note that to access my only object, I had to type cast my list of arrays because it is of type Any!
source to share
You can .stream
and .reduce
your List
up- .zip
ping this:
final Observable<YourType> result = observableList.stream()
.reduce((observable1, observable2) ->
Observable.zip(observable1, observable2, YourTypeCombiner::methodToCombineTwoObjectsOfYourType))
.orElse(Observable.just(defaultValueForYourType));
for example the result will give new Integer(10)
final List<Observable<Integer>> observableList = ImmutableList.of(
Observable.just(1), Observable.just(2), Observable.just(3), Observable.just(4));
final Observable<Integer> result = observableList.stream()
.reduce((num1, observable2) ->
Observable.zip(observable1, observable2, Integer::sum))
.orElse(Observable.just(0));
Error handling is not enabled.
source to share