Asynchronous cleanup on observable sequence completion

I need to execute an asynchronous method (like a cleanup job) on an observable completion / failure. Also, if the cleanup method fails, the monitored rung should fail as well.

Is there a standard way to do this?

Suppose I have a sequential Observable source and asyncCleanup () method that returns an observable cleanup result.

Side effect methods like doOnCompleted / doOnTerminate / doOnUnsubscribe don't seem to fit:

source.doOnUnsubscribe(()->asyncCleanup().subscribe());

      

The observable chain will succeed even if asyncCleanup () fails. Therefore asyncCleanup () must be part of the same chain.

The best I have come up with the following:

source.onErrorResumeNext(err->asyncCleanup()
         .flatMap(cleanupResult->Observable.error(err)))
      .concatWith(asyncCleanup()
         .flatMap(cleanupResult->Observable.empty()));

      

On failure, onErrorResumeNext will call asyncCleanup () and revert to the original error. If successful, we can concatenate with asyncCleanup () mapped to an empty sequence. Unfortunately this won't work if there is take () or a similar downstream constraint operator, which could result in the concatenated observables not even being signed.

UPDATE 2017-08-01 : This is about observable sequences in particular. For a single element of an observable source, the solution is quite simple:

singleItemSource
    .onErrorResumeNext(err->asyncCleanup()
        .flatMap(cleanupResult->Observable.error(err)))
    .flatMap(single->asyncCleanup()
        .map(cleanupResult->single));

      

+3


source to share


3 answers


The idea you are proposing violates the contract Observable

because:

Rx Design Guidelines 6.8 - Unsubscribing shouldn't throw

The only thing that comes to my mind to solve your problem is to convert asyncCleanup()

to the same type the previous sequence, and in case of a successful return like Observable.empty () you can use concat

.



public Observable<Long> asyncCleanup() {
...
    return Observable.empty();
}

//Concat asyncCleanup() to be executed after the sequence
Observable.rangeLong(1, 10)
            .concatWith(asyncCleanup())
            .subscribe(System.out::println, System.out::println);

      

Hope it helps ...

+1


source


There is an flatMap

overload that allows you to map onXXX

upstream events to Observable

s:



Observable.range(1, 10)
    .flatMap(Observable::just, e -> asyncCleanup(), () -> asyncCleanup())
    .subscribe(...);

      

+1


source


Another solution I have commented is the factory method Observable.using

. Here's a simple example of how it works:

Observable<Long> range = Observable.rangeLong(1, 10);

Observable.using(() -> range, //The resource provider
            observable -> observable, //The resource
            observable -> asyncCleanup() //The resource disposer
                     .subscribe(System.out::println, System.out::println))
    .subscribe(System.out::println, System.out::println);

      

The downside here is that you have to handle success and errors asyncCleanup()

without spreading or throwing. With this approach, you should consider moving away from Observables in the case of a method asyncCleanup()

.

0


source







All Articles