Subscribe to RxJava observable multiple times
I have a question about RxJava Observable. For example, I have a Retrofit interface that returns me an Observable. I need to do something with this video stream. Here is the code to download the video and save it to the list:
API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
API - Rest Adapter Adapter
If you want to update video recording, should I perform the same operations? Or should I get a subscription, unsubscribe from it and subscribe again like this:
Subscription s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
//some code
...
s.unsubscribe();
s = null;
s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
source to share
Your whole event flow is wrong, you must update the UI in the subscriber and not in doOnNext.
API.getVideoListObservable()
.map(r -> r.getObjects())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(videos -> {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}, throwable -> {
// handle errors
throwable.printStackTrace();
});
Retrofitting the kit subscribeOn(Schedulers.io())
for you.
Move everything into a function and call it when you need to update the list. There is no need to manually unsubscribe as subscribers are automatically unsubscribed when they receive an onCompleted or onError.
Remember that when using RxJava on Android you have to deal with Activity / Fragment lifecycle and configuration changes. You have to keep track of pending network calls and manually unsubscribe when the user navigates to another Activity / Fragment.
My preferred way of solving all the RxJava + Android problems I've encountered so far is using https://github.com/evant/rxloader . Your code using RxLoader will look like this:
private Observable<List<Video>> getVideos() {
return API.getVideoListObservable()
.map(r -> r.getObjects());
}
// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
getVideos().observeOn(AndroidSchedulers.mainThread()),
new RxLoaderObserver<List<Video>>() {
@Override
public void onNext(List<Video> videos) {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}
@Override
public void onError(Throwable throwable) {
// handle errors
throwable.printStackTrace();
}
});
// whenever you need to update videos
videoLoader.restart();
source to share