How to bind asynchronous operations using Java RX Observable?

I want to make the HTTP request again and act on the result. I start with public Observable<NewsItem> fetchItems(NewsFeed feed)

. One request gets multiple news, but I decided to smooth it out.

The idea was to use Observable.interval()

make a request multiple times and then combine the resulting Observables into one.

       Observable
            .interval(timePerItem, TimeUnit.MILLISECONDS)
            .map(i -> feed)
            .map(feed -> fetchItems(feed))
            .subscribe(result -> System.out.println(result));

      

But the result is Observable<Observable<NewsItem>>

not Observable<NewsItem>

. How to unload them?

I found the marge () operator (RX-Java doc: Marge) . But it doesn't seem to be suitable for use.

In the previous version I used CompletableFuture<List<NewsItem>> fetchNewsItems()

, but I was unable to put it in the Observable chain.

+3


source to share


1 answer


Not sure if I understand the problem, but aren't you just looking flatMap

?



Observable
    .interval(timePerItem, TimeUnit.MILLISECONDS)
    .flatMap(i -> fetchItems(feed))
    .subscribe(result -> System.out.println(result));

      

+3


source







All Articles