Subscribe to RxJava observable multiple times

I have a question about RxJava Observable. For example, I have a Retrofit interface that returns me an Observable. I need to do something with this video stream. Here is the code to download the video and save it to the list:

API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

      

API - Rest Adapter Adapter

If you want to update video recording, should I perform the same operations? Or should I get a subscription, unsubscribe from it and subscribe again like this:

Subscription s = API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

//some code
...

s.unsubscribe();
s = null;
s = API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

      

+3


source to share


1 answer


Your whole event flow is wrong, you must update the UI in the subscriber and not in doOnNext.

API.getVideoListObservable()
    .map(r -> r.getObjects())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(videos -> {
        // do stuff with videos
        fragment.updateVideoList(videos);
        kalturaVideoList.addAll(videos);
    }, throwable -> {
        // handle errors
        throwable.printStackTrace();
    });

      

Retrofitting the kit subscribeOn(Schedulers.io())

for you.

Move everything into a function and call it when you need to update the list. There is no need to manually unsubscribe as subscribers are automatically unsubscribed when they receive an onCompleted or onError.



Remember that when using RxJava on Android you have to deal with Activity / Fragment lifecycle and configuration changes. You have to keep track of pending network calls and manually unsubscribe when the user navigates to another Activity / Fragment.

My preferred way of solving all the RxJava + Android problems I've encountered so far is using https://github.com/evant/rxloader . Your code using RxLoader will look like this:

private Observable<List<Video>> getVideos() {
    return API.getVideoListObservable()
        .map(r -> r.getObjects());
}

// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
    getVideos().observeOn(AndroidSchedulers.mainThread()),
    new RxLoaderObserver<List<Video>>() {
        @Override
        public void onNext(List<Video> videos) {
            // do stuff with videos
            fragment.updateVideoList(videos);
            kalturaVideoList.addAll(videos);
        }

        @Override
        public void onError(Throwable throwable) {
            // handle errors
            throwable.printStackTrace();
        }
    });

// whenever you need to update videos
videoLoader.restart();

      

+5


source







All Articles