Making a call using RX and then making multiple bridged calls from the result of the first call

I need to make an API call that returns a list of items. For each item in this list, I have to call a different API (if the list returns 8 items, I will need to make 8 concurrent calls).

Finally, I need to return a list that I will create with the results of each of these 8 concurrent calls.

How can I do this using RxJava? I think I need to use flatMap to convert the result of the first call to a list of Observables and then I have to use the zip operator to make parallel calls, but I'm not sure.

Please note that I am using RxJava2 and no lambdas expressions.

Thank!

+3


source to share


1 answer


you can do it like this, for example defer()

allows you to fetch data only when you subscribe and then create an Observable that emits all the elements (one by one) in the list of elements.
then it flatMap()

will create Observable

one that will fetch data for each item, and now you will Observable

have that emit data objects. to collect it you can use toList()

which will highlight one object (a List

) which will contain everything Data

received by each Observable

.

Note that in order to do this in parallel, it is important to fetchDataFromItem()

subscribe to Schedulers.io()

, even if the entire stream is subscribed to io.

Observable.defer(new Callable<ObservableSource<Item>>() {
        @Override
        public ObservableSource<Item> call() throws Exception {
            List<Item> items = getItems();
            return Observable.fromIterable(items);
        }
    })
            .flatMap(new Function<Item, ObservableSource<Data>>() {
                @Override
                public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
                    return fetchDataFromItem(item);
                }
            })
            .toList()
            .subscribe(new Consumer<List<Data>>() {
                @Override
                public void accept(@NonNull List<Data> objects) throws Exception {
                //do something with the list of all fetched data
                }
            });

      



UPDATE:

in case the selection of elements is already Observable

, defer()

you can replace with flatMapIterable()

, which takes one list of elements and converts it to an observable of several elements:

getItemsObservable()
        .flatMapIterable(new Function<List<Item>, Iterable<Item>>() {
            @Override
            public Iterable<Item> apply(@NonNull List<Item> items) throws Exception {
                return items;
            }
        })
        .flatMap(new Function<Item, ObservableSource<Data>>() {
            @Override
            public ObservableSource<Data> apply(@NonNull Item item) throws Exception {
                return fetchDataFromItem(item);
            }
        })
        .toList()
        .subscribe(new Consumer<List<Data>>() {
            @Override
            public void accept(@NonNull List<Data> objects) throws Exception {
                //do something with the list of all fetched data
            }
        });

      

+3


source







All Articles