RxJava concat when first Observable is empty

I have a local repository and a remote repository from where I am fetching data. This is the first time I want to fetch data from a remote repository and then cache it and fetch from a local database. I am using 2 Observables and the concat method for this:

Observable.concat(localWeatherForecast, remoteWeatherForecast)
                .filter(weatherForecasts -> !weatherForecasts.isEmpty())
                .first();

      

For local observables, I use this:

private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
    return mLocalDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
                    return Observable.from(weatherEntries)
                            .doOnNext(entry -> mCachedWeather.add(entry))
                            .toList();
                }
            });
}

      

and for the remote:

return mRemoteDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
                    return Observable.from(weather).doOnNext(entry -> {
                        mLocalDataSource.saveWeatherEntry(entry);
                        mCachedWeather.add(entry);
                    }).toList();
                }
            })
            .doOnCompleted(() -> mCacheIsDirty = false);

      

and here is the subscription

Subscription subscription = mRepository
            .getWeather()
            .subscribeOn(mSchedulerProvider.computation())
            .observeOn(mSchedulerProvider.ui())
            .subscribe(
                    // onNext
                    this::processWeather,
                    // onError
                    throwable -> mWeatherView.showLoadingTasksError(),
                    // onCompleted
                    () -> mWeatherView.setLoadingIndicator(false));

      

It seems that concat does not move to the second observable (remote) even if the local repository is empty. If I reverse the order then it works (remote being first in concat). I tried to remove the / first () filter method, but even so that the remote observable is not being handled.

Any ideas why? Thank!

+3


source to share


2 answers


you can provide a timeout for the first observable. If the first observable never ends, it doesn't switch to the second. Please see the testing methods I provide:

The test method "never_false" will not return any values ​​and time after 1000ms because no values ​​were pressed.



For the 'never_true' method, the first observable in concat will time out after a while and switch to the onComplete observable. Therefore, concat switches to the second observable and takes the first item from this stream.

@Test
public void never_false() throws Exception {
    Observable<List<Integer>> never = Observable.never();

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
    Observable<List<Integer>> concat = Observable.concat(never, just);

    boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);

    assertThat(await).isTrue();
}

@Test
public void never_true() throws Exception {
    Observable<List<Integer>> never = Observable.<List<Integer>>never()
            .timeout(50, TimeUnit.MILLISECONDS)
            .onErrorResumeNext(Observable.empty());

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));

    TestObserver<List<Integer>> test = Observable.concat(never, just)
            .test()
            .await()
            .assertComplete()
            .assertNoErrors();

    List<Integer> collect = test.values().stream()
            .flatMap(Collection::stream)
            .collect(Collectors.toList());

    assertThat(collect).contains(1, 2, 3);
}

      

+2


source


concat()

will wait until the first Observable

one emits everything and then listens for the second one Observable

. If the former Observable

cannot emit anything, i.e. It is empty, then the second Observable

will not be considered.

You can start your first stream with the unwanted element and then skip the first emission of the stream:



Observable.concat(
            Observable.<Integer>empty().startWith(-1),
            Observable.fromArray(1, 2, 3))
        .skip(1)
        .test()
        .assertResult(1, 2, 3);

      

Now the first one Observable

will issue -1

and there will be nothing after that, which is normal for the operator concat()

to work properly. Later, you let in the first radiation that you are not interested in.

0


source







All Articles