RxJava: How to interrupt a stream on unsubscribe?

I am using Observable.create()

to create an observable to do some work on the scheduler (like Schedulers.io()

) and then return the result on AndroidSchedulers.mainThread()

.

val subscription = observable<T> {
        try {
            // perform action synchronously
            it.onNext(action.invoke(context, args))
            it.onCompleted()
        } catch (t: Exception) {
            it.onError(t)
        }
    }.subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    {
                        // handle result here
                        result.set(it)
                    },
                    {
                        // handle error here
                        errorHandler.handleTaskError(model, this, it)
                    },
                    {
                        // notify completed
                        model.completeTask(this)
                    }
            )

      

The operation internally action.invoke()

is synchronous and can be a blocking I / O operation. When the user decides to cancel it, I'll unsubscribe from the observable:subscription.unsubscribe()

However, the I / O operation is not interrupted. Is there any rx-java API to abort the operation?

+3


source to share


3 answers


When you call yourSubscription.unsubscribe();

, Rx will invoke the unsubscribe code.

This code for beginners will be Subscription

, which you can add

on subscriber

when creating Observable

.

Observable<Object> obs = Observable.create(subscriber -> {

      subscriber.add(new Subscription() {
            @Override
            public void unsubscribe() {
                 // perform unsubscription
            }

            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        });
      subscriber.onNext(/**...*/);
      subscriber.onCompleted();
}

      



So, in a method, unsubscribe

you can interrupt your work if you have a way to do it.

Note that the unsubscribe method is called when you unsubscribe from an Observable, or when An Observable completes. (he will unsubscribe himself)

edit : accepting vladimir mironov 's account comment

+9


source


RxJava schedulers use Furure.cancel (true) to abort the current action, so if you have an interrupt sensitive operation (for example Thread.sleep()

), you should see the thread interrupt, although you can't do much because at this point the whole chain the downstream has been unsubscribed and no further events can reach your subscriber.

If your action is not interrupt sensitive and does not provide any standard means to interrupt it (for example, a call close()

on a socket or cancel()

in a JDBC statement), RxJava cannot do much about it.



If you can call the close () method asynchronously, consider using using()

that allows you to specify a cleanup callback that can close your resource when you unsubscribe.

+3


source


As far as I know, this is not possible with RxJava.

You can keep the reference to the object action

outside the observable and abort along with the call unsubscribe

.

0


source







All Articles