RxJava: How to interrupt a stream on unsubscribe?
I am using Observable.create()
to create an observable to do some work on the scheduler (like Schedulers.io()
) and then return the result on AndroidSchedulers.mainThread()
.
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
The operation internally action.invoke()
is synchronous and can be a blocking I / O operation. When the user decides to cancel it, I'll unsubscribe from the observable:subscription.unsubscribe()
However, the I / O operation is not interrupted. Is there any rx-java API to abort the operation?
source to share
When you call yourSubscription.unsubscribe();
, Rx will invoke the unsubscribe code.
This code for beginners will be Subscription
, which you can add
on subscriber
when creating Observable
.
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
So, in a method, unsubscribe
you can interrupt your work if you have a way to do it.
Note that the unsubscribe method is called when you unsubscribe from an Observable, or when An Observable completes. (he will unsubscribe himself)
edit : accepting vladimir mironov 's account comment
source to share
RxJava schedulers use Furure.cancel (true) to abort the current action, so if you have an interrupt sensitive operation (for example Thread.sleep()
), you should see the thread interrupt, although you can't do much because at this point the whole chain the downstream has been unsubscribed and no further events can reach your subscriber.
If your action is not interrupt sensitive and does not provide any standard means to interrupt it (for example, a call close()
on a socket or cancel()
in a JDBC statement), RxJava cannot do much about it.
If you can call the close () method asynchronously, consider using using()
that allows you to specify a cleanup callback that can close your resource when you unsubscribe.
source to share