RxJava concat when first Observable is empty
I have a local repository and a remote repository from where I am fetching data. This is the first time I want to fetch data from a remote repository and then cache it and fetch from a local database. I am using 2 Observables and the concat method for this:
Observable.concat(localWeatherForecast, remoteWeatherForecast)
.filter(weatherForecasts -> !weatherForecasts.isEmpty())
.first();
For local observables, I use this:
private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
return mLocalDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
return Observable.from(weatherEntries)
.doOnNext(entry -> mCachedWeather.add(entry))
.toList();
}
});
}
and for the remote:
return mRemoteDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
return Observable.from(weather).doOnNext(entry -> {
mLocalDataSource.saveWeatherEntry(entry);
mCachedWeather.add(entry);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false);
and here is the subscription
Subscription subscription = mRepository
.getWeather()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.subscribe(
// onNext
this::processWeather,
// onError
throwable -> mWeatherView.showLoadingTasksError(),
// onCompleted
() -> mWeatherView.setLoadingIndicator(false));
It seems that concat does not move to the second observable (remote) even if the local repository is empty. If I reverse the order then it works (remote being first in concat). I tried to remove the / first () filter method, but even so that the remote observable is not being handled.
Any ideas why? Thank!
source to share
you can provide a timeout for the first observable. If the first observable never ends, it doesn't switch to the second. Please see the testing methods I provide:
The test method "never_false" will not return any values ββand time after 1000ms because no values ββwere pressed.
For the 'never_true' method, the first observable in concat will time out after a while and switch to the onComplete observable. Therefore, concat switches to the second observable and takes the first item from this stream.
@Test
public void never_false() throws Exception {
Observable<List<Integer>> never = Observable.never();
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
Observable<List<Integer>> concat = Observable.concat(never, just);
boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);
assertThat(await).isTrue();
}
@Test
public void never_true() throws Exception {
Observable<List<Integer>> never = Observable.<List<Integer>>never()
.timeout(50, TimeUnit.MILLISECONDS)
.onErrorResumeNext(Observable.empty());
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
TestObserver<List<Integer>> test = Observable.concat(never, just)
.test()
.await()
.assertComplete()
.assertNoErrors();
List<Integer> collect = test.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(collect).contains(1, 2, 3);
}
source to share
concat()
will wait until the first Observable
one emits everything and then listens for the second one Observable
. If the former Observable
cannot emit anything, i.e. It is empty, then the second Observable
will not be considered.
You can start your first stream with the unwanted element and then skip the first emission of the stream:
Observable.concat(
Observable.<Integer>empty().startWith(-1),
Observable.fromArray(1, 2, 3))
.skip(1)
.test()
.assertResult(1, 2, 3);
Now the first one Observable
will issue -1
and there will be nothing after that, which is normal for the operator concat()
to work properly. Later, you let in the first radiation that you are not interested in.
source to share