Asynchronous cleanup on observable sequence completion
I need to execute an asynchronous method (like a cleanup job) on an observable completion / failure. Also, if the cleanup method fails, the monitored rung should fail as well.
Is there a standard way to do this?
Suppose I have a sequential Observable source and asyncCleanup () method that returns an observable cleanup result.
Side effect methods like doOnCompleted / doOnTerminate / doOnUnsubscribe don't seem to fit:
source.doOnUnsubscribe(()->asyncCleanup().subscribe());
The observable chain will succeed even if asyncCleanup () fails. Therefore asyncCleanup () must be part of the same chain.
The best I have come up with the following:
source.onErrorResumeNext(err->asyncCleanup()
.flatMap(cleanupResult->Observable.error(err)))
.concatWith(asyncCleanup()
.flatMap(cleanupResult->Observable.empty()));
On failure, onErrorResumeNext will call asyncCleanup () and revert to the original error. If successful, we can concatenate with asyncCleanup () mapped to an empty sequence. Unfortunately this won't work if there is take () or a similar downstream constraint operator, which could result in the concatenated observables not even being signed.
UPDATE 2017-08-01 : This is about observable sequences in particular. For a single element of an observable source, the solution is quite simple:
singleItemSource
.onErrorResumeNext(err->asyncCleanup()
.flatMap(cleanupResult->Observable.error(err)))
.flatMap(single->asyncCleanup()
.map(cleanupResult->single));
source to share
The idea you are proposing violates the contract Observable
because:
Rx Design Guidelines 6.8 - Unsubscribing shouldn't throw
The only thing that comes to my mind to solve your problem is to convert asyncCleanup()
to the same type the previous sequence, and in case of a successful return like Observable.empty () you can use concat
.
public Observable<Long> asyncCleanup() {
...
return Observable.empty();
}
//Concat asyncCleanup() to be executed after the sequence
Observable.rangeLong(1, 10)
.concatWith(asyncCleanup())
.subscribe(System.out::println, System.out::println);
Hope it helps ...
source to share
Another solution I have commented is the factory method Observable.using
. Here's a simple example of how it works:
Observable<Long> range = Observable.rangeLong(1, 10);
Observable.using(() -> range, //The resource provider
observable -> observable, //The resource
observable -> asyncCleanup() //The resource disposer
.subscribe(System.out::println, System.out::println))
.subscribe(System.out::println, System.out::println);
The downside here is that you have to handle success and errors asyncCleanup()
without spreading or throwing. With this approach, you should consider moving away from Observables in the case of a method asyncCleanup()
.
source to share