How to clear Observable created with .create (OnSubscribe) method
I have the following code that creates a custom one Observable
using a method Observable.create(OnSubscribe)
:
public class Main {
public static void main(String[] args) {
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
new Scanner(System.in).nextLine();
System.err.println("finishing");
subscription.unsubscribe();
}
}
Observable emits a string every second using a timer. When the user clicks the button, the subscription is canceled.
However, the timer is still running. How can I cancel a timer? I am assuming there is a hook somewhere, but I cannot find it.
In .NET, the method create
returns IDisposable
, which could be my implementation to stop the timer. I'm not sure how to map it to RxJava since its method subscribe
void
.
source to share
A more declarative (and IMHO easier to read) solution would be to use the method Observable.using
:
Observable<String> obs = Observable.using(
// resource factory:
() -> new Timer(),
// observable factory:
timer -> Observable.create(subscriber -> {
TimerTask task = new TimerTask() {
public void run() {
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}),
// dispose action:
timer -> timer.cancel()
);
You declare how the dependent resource ( Timer
) is created, how it was used to create the Observable, and how it is disposed of, and RxJava will take care of creating a timer upon subscribing and deleting it after unsubscribing.
source to share
you can add code that will be called when you unsubscribe from the stream. To do this, you need to add a new one Subscription
to yours subscriber
in the method create
:
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// stop the timer here
}
@Override
public boolean isUnsubscribed() {
// is the stream unsubscribed ?
return false;
}
});
source to share
You can check if the subscriber has not signed before sending it a new value. If he canceled the subscription, stop the timer:
Subscription subscription = Observable
.create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
if (subscriber.isUnsubscribed()) {
// stop timer
} else {
subscriber.onNext("tick! tack!");
}
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
.subscribe(System.out::println);
source to share
So, the combo from the above will complete the task:
public class Main {
public static void main(String[] args) throws InterruptedException {
Subscription subscription = Observable.create(subscriber -> {
Timer timer = new Timer();
subscriber.add(Subscriptions.create(() -> {
timer.cancel();
}));
TimerTask task = new TimerTask() {
@Override
public void run() {
if (!subscriber.isUnsubscribed())
subscriber.onNext("tick! tack!");
}
};
timer.scheduleAtFixedRate(task, 0L, 1000L);
}).subscribe(System.out::println);
System.err.println("finishing");
subscription.unsubscribe();
Thread.sleep(10000);
}
This is a more idiomatic RxJava example that does the same thing with much less code:
Subscription subscription = Observable
.interval(1000, TimeUnit.MILLISECONDS)
.map(n -> "tick! tack!")
.subscribe(System.out::println);
Thread.sleep(3000);
System.err.println("finishing");
subscription.unsubscribe();
source to share
I was looking for a similar solution, and found this code in RxAndroid:
It looks like you don't need to use Observable.using
to perform the cleanup step. Instead, it looks like you can add another caller who will do your cleanup.
Observable .create(subscriber -> {
Timer timer = new Timer();
TimerTask task = new TimerTask() {
@Override
public void run() {
subscriber.onNext("tick! tack!");
}
};
subscriber.add(Subscriptions.create(() -> {
timer.cancel();
}));
timer.scheduleAtFixedRate(task, 0L, 1000L);
})
source to share