How to bind asynchronous operations using Java RX Observable?
I want to make the HTTP request again and act on the result. I start with public Observable<NewsItem> fetchItems(NewsFeed feed)
. One request gets multiple news, but I decided to smooth it out.
The idea was to use Observable.interval()
make a request multiple times and then combine the resulting Observables into one.
Observable
.interval(timePerItem, TimeUnit.MILLISECONDS)
.map(i -> feed)
.map(feed -> fetchItems(feed))
.subscribe(result -> System.out.println(result));
But the result is Observable<Observable<NewsItem>>
not Observable<NewsItem>
. How to unload them?
I found the marge () operator (RX-Java doc: Marge) . But it doesn't seem to be suitable for use.
In the previous version I used CompletableFuture<List<NewsItem>> fetchNewsItems()
, but I was unable to put it in the Observable chain.
source to share