How do I pause / resume monitoring?

I am trying to implement paging with modification, but I am struggling to find a way to pause the observable so that it does not continue to request pages that are not needed.

The basic question is: can I tell the observable to "pause" and "resume"? I am not talking about buffering or skipping, but I would like the source to be watched to stop completely, that is: do not make any web requests, etc.

Below is the simulation code I am working with. rangeObservable is a simulated web server "pager" and timerObservable is like receiving scroll events.

package example.wanna.be.pausable;

import java.io.IOException;
import java.lang.Throwable;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.Subscription;
import rx.Subscriber;

public class Main {

  private static ConnectableObservable rangeObservable;

  private static void setPaused(boolean paused) {
    // How do I pause/resume rangeObservable?
  }

  public static void main(String[] args) {

    rangeObservable = Observable.range(0, Integer.MAX_VALUE).publish();
    Observable timerObservable = Observable.timer(2, 2, TimeUnit.SECONDS);

    rangeObservable.subscribe(new Subscriber<Integer>() {

      private int count = 0;

      public void onStart() {
        System.out.println("Range started");
      }

      public void onNext(Integer i) {
        System.out.println("Range: " + i);

        if (++count % 20 == 0) {
          System.out.println("Pausing");
          setPaused(true);
        }
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Range done");
      }

    });

    timerObservable.subscribe(new Subscriber<Long>() {

      public void onStart() {
        System.out.println("Time started");

        // I dont know where to put this
        // rangeObservable.connect();
      }

      public void onNext(Long i) {
        System.out.println("Timer: " + i);
        setPaused(false);
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Timer done");
      }

    });

    // for some reason I have to do this or it just exits immediately
    try {
      System.in.read();
    } catch(IOException e) {
      e.printStackTrace();
    }

  }

}

      

Any guidance would be greatly appreciated!

+3


source to share


1 answer


You want to keep your subscription and unsubscribe / unsubscribe from it (haven't tested it fully, but I think it should work, most of my changes to setPaused might need to fix duplicate code):

package example.wanna.be.pausable;

import java.io.IOException;
import java.lang.Throwable;
import java.util.concurrent.TimeUnit;

import rx.Observable;
import rx.observables.ConnectableObservable;
import rx.Subscription;
import rx.Subscriber;

public class Main {

  private static ConnectableObservable rangeObservable;
  Subscription mSubscription;

  private static void setPaused(boolean pause) {
    if (pause) {
        mSubscription.unsubscribe()
    } else {
        mSubscription.subscribe(new Subscriber<Integer>() {

      private int count = 0;

      public void onStart() {
        System.out.println("Range started");
      }

      public void onNext(Integer i) {
        System.out.println("Range: " + i);

        if (++count % 20 == 0) {
          System.out.println("Pausing");
          setPaused(true);
        }
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Range done");
      }

    });
  }

  public static void main(String[] args) {

    rangeObservable = Observable.range(0, Integer.MAX_VALUE).publish();
    Observable timerObservable = Observable.timer(2, 2, TimeUnit.SECONDS);

    mSubscription = rangeObservable.subscribe(new Subscriber<Integer>() {

      private int count = 0;

      public void onStart() {
        System.out.println("Range started");
      }

      public void onNext(Integer i) {
        System.out.println("Range: " + i);

        if (++count % 20 == 0) {
          System.out.println("Pausing");
          setPaused(true);
        }
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Range done");
      }

    });

    timerObservable.subscribe(new Subscriber<Long>() {

      public void onStart() {
        System.out.println("Time started");

        // I dont know where to put this
        // rangeObservable.connect();
      }

      public void onNext(Long i) {
        System.out.println("Timer: " + i);
        setPaused(false);
      }

      public void onError(Throwable e) {
        e.printStackTrace();
      }

      public void onCompleted() {
        System.out.println("Timer done");
      }

    });

    // for some reason I have to do this or it just exits immediately
    try {
      System.in.read();
    } catch(IOException e) {
      e.printStackTrace();
    }

  }

}

      

Also at the end you have:



// for some reason I have to do this or it just exits immediately
        try {
          System.in.read();
        } catch(IOException e) {
          e.printStackTrace();
        }

      

The reason for this is that in the main () function the subscriptions are started asynchronously, so your program reaches the end of main and then exits because there is no more code to run (since your observables are running on a different thread).

0


source







All Articles