How to clear Observable created with .create (OnSubscribe) method

I have the following code that creates a custom one Observable

using a method Observable.create(OnSubscribe)

:

public class Main {

    public static void main(String[] args) {
        Subscription subscription = Observable
                .create(subscriber -> {
                    Timer timer = new Timer();
                    TimerTask task = new TimerTask() {

                        @Override
                        public void run() {
                            subscriber.onNext("tick! tack!");
                        }
                    };
                    timer.scheduleAtFixedRate(task, 0L, 1000L);
                })
                .subscribe(System.out::println);

        new Scanner(System.in).nextLine();
        System.err.println("finishing");

        subscription.unsubscribe();
    }
}

      

Observable emits a string every second using a timer. When the user clicks the button, the subscription is canceled.

However, the timer is still running. How can I cancel a timer? I am assuming there is a hook somewhere, but I cannot find it.

In .NET, the method create

returns IDisposable

, which could be my implementation to stop the timer. I'm not sure how to map it to RxJava since its method subscribe

void

.

+3


source to share


5 answers


A more declarative (and IMHO easier to read) solution would be to use the method Observable.using

:

Observable<String> obs = Observable.using(
    // resource factory:
    () -> new Timer(),
    // observable factory:
    timer -> Observable.create(subscriber -> {
        TimerTask task = new TimerTask() {
            public void run() {
                subscriber.onNext("tick! tack!");
            }
        };
        timer.scheduleAtFixedRate(task, 0L, 1000L);
    }),
    // dispose action:
    timer -> timer.cancel()
);

      



You declare how the dependent resource ( Timer

) is created, how it was used to create the Observable, and how it is disposed of, and RxJava will take care of creating a timer upon subscribing and deleting it after unsubscribing.

+4


source


you can add code that will be called when you unsubscribe from the stream. To do this, you need to add a new one Subscription

to yours subscriber

in the method create

:



 subscriber.add(new Subscription() {
                @Override
                public void unsubscribe() {
                     // stop the timer here
                }

                @Override
                public boolean isUnsubscribed() {
                     // is the stream unsubscribed ?
                    return false; 
                }
            });

      

+3


source


You can check if the subscriber has not signed before sending it a new value. If he canceled the subscription, stop the timer:

Subscription subscription = Observable
    .create(subscriber -> {
        Timer timer = new Timer();
        TimerTask task = new TimerTask() {

            @Override
            public void run() {
                if (subscriber.isUnsubscribed()) {
                    // stop timer
                } else {
                    subscriber.onNext("tick! tack!");
                }
            }
        };
        timer.scheduleAtFixedRate(task, 0L, 1000L);
    })
    .subscribe(System.out::println);

      

+2


source


So, the combo from the above will complete the task:

public class Main {

    public static void main(String[] args) throws InterruptedException {
        Subscription subscription = Observable.create(subscriber -> {
            Timer timer = new Timer();
            subscriber.add(Subscriptions.create(() -> {
                timer.cancel();
            }));
            TimerTask task = new TimerTask() {

                @Override
                public void run() {
                    if (!subscriber.isUnsubscribed())
                        subscriber.onNext("tick! tack!");
                }
            };
            timer.scheduleAtFixedRate(task, 0L, 1000L);
        }).subscribe(System.out::println);

        System.err.println("finishing");

        subscription.unsubscribe();

        Thread.sleep(10000);
    }

      

This is a more idiomatic RxJava example that does the same thing with much less code:

Subscription subscription = Observable
    .interval(1000, TimeUnit.MILLISECONDS)
    .map(n -> "tick! tack!")
    .subscribe(System.out::println);
Thread.sleep(3000);
System.err.println("finishing");
subscription.unsubscribe();

      

+2


source


I was looking for a similar solution, and found this code in RxAndroid:

https://github.com/ReactiveX/RxAndroid/blob/v0.24.0/rxandroid/src/main/java/rx/android/content/OnSubscribeBroadcastRegister.java

It looks like you don't need to use Observable.using

to perform the cleanup step. Instead, it looks like you can add another caller who will do your cleanup.

Observable .create(subscriber -> {
  Timer timer = new Timer();
  TimerTask task = new TimerTask() {
      @Override
      public void run() {
          subscriber.onNext("tick! tack!");
      }
  };

  subscriber.add(Subscriptions.create(() -> {
    timer.cancel();
  }));

  timer.scheduleAtFixedRate(task, 0L, 1000L);
})

      

0


source







All Articles