RxJava - zip list of observables

I have a list of Observables (RxJava 1).

List<Observable> observableList = new ArrayList<>();

      

It can contain at least 1 observable. Each of them has the same result type.

How can I consolidate the results of all observables?

I was thinking about the zip-operator, but it doesn't support List and I don't know the number of observables (it could be 1,2,3,4 ....)

+10


source to share


3 answers


You can use a static method . zip(java.lang.Iterable<? extends Observable<?>> ws,FuncN<? extends R> zipFunction)

It's a zip

method that takes Iterable

of Observable

and (which takes a varargs parameter for its method ) and uses it to combine the corresponding emitted into a result that will be omitted by the new return . FuncN

call

Object

Observable



So, for example, you could do:

Observable.zip(observableList, new FuncN(){
    public ReturnType call(java.lang.Object... args){
        ReturnType result; //to be made
        //preparatory code for using the args
        for (Object obj : args){
            ReturnType retObj = (ReturnType)obj;
            //code to use the arg once at a time to combine N of them into one.
        }
        return result;
    }
});

      

+15


source


ReactiveX - Zip operator

Zip outside BiFunction

Zip concatenates the outliers of multiple observables together through a given function and highlights separate elements for each combination based on the results of that function

Here list is an array of lists of observable types that you want to pass.



val list = arrayListOf<Observable<ImageUrlResponse>>()


Observable.zip(list) { args -> Arrays.asList(args) }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
        val response = it[0]
        val imageUrlResponse = imageUrlObject as ImageUrlResponse
        urls.add(imageUrlResponse.imageUrl)}
               }, {
        val c = it
 })

      

The result of the next subscription is this image below. Just as we expect this to be packaged together . Also you may notice that it returns all responses, which will be zipped into one java.lang.Object [] .

Note that to access my only object, I had to type cast my list of arrays because it is of type Any!

enter image description here

+4


source


You can .stream

and .reduce

your List

up- .zip

ping this:

final Observable<YourType> result = observableList.stream()
        .reduce((observable1, observable2) ->
                Observable.zip(observable1, observable2, YourTypeCombiner::methodToCombineTwoObjectsOfYourType))
        .orElse(Observable.just(defaultValueForYourType));


      

for example the result will give new Integer(10)

final List<Observable<Integer>> observableList = ImmutableList.of(
        Observable.just(1), Observable.just(2), Observable.just(3), Observable.just(4));

final Observable<Integer> result = observableList.stream()
        .reduce((num1, observable2) ->
                Observable.zip(observable1, observable2, Integer::sum))
        .orElse(Observable.just(0));

      

Error handling is not enabled.

0


source







All Articles