Best practices for exposing "expensive" observations in RxJava

I am new to RxJava and am trying to define common idioms and best practices.

Let's say I have a class Foo

that emits Bar

(incomplete and simplified at the moment):

class Foo {
    public Subscriber barSubscriber;
    public Observable<Bar> getBarObservable = (...details omitted...)

    private void someMethod() {
        // emit a Bar
        barSubscriber.onNext(bar); 
    }
}

      

Other objects that want to subscribe to those Bars

do so by calling

foo.getBarObservable().subscribe(...);

      

Let's say that production and radiation Bar

are "expensive". To avoid this when there are no more callers, Foo getBarObservable

can expose a pluggable, ref-counted Observable

, like this (using share()

):

class Foo {
    private Subscriber barSubscriber;
    private Observable<Bar> barObservable =  Observable.create(
            new Observable.OnSubscribe<Bar>() {
                @Override
                public void call(Subscriber<? super Bar> subscriber) {
                    Foo.this.subscriber = subscriber;
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            runUntilUnsubscribed();
                        }
                    }).start();

                }
            }
    ).share();

    public Observable<Bar> getBarObservable() {
        return barObservable;
    }

    public void runUntilUnsubscribed(} {
        while(!subscriber.isUnsubscribed()) {

            /* do some heavy stuff that produces a Bar.  If, when a 
               Bar is ready, we still have subscribers, emit the Bar */

            if (!subscriber.isUnsubscribed()) 
                subscriber.onNext(bar);
        }
    }
}

      

Most of the examples and tutorials I've seen create Observable

inline on the fly in the same piece of code that subscribes to them, so it's not clear to me that the standard practice is in real time, world scenario where create Observable

and subscribe on it are in two different places.

  • For a class like Foo

    that that doesn't want to know who its subscribers are or how many subscribers it will have, is this the right approach?
  • It seems to me that this will be a very typical scenario - isn't it? Or, at a high level, isn't this the right way to think about exposing Observable

    s? Are there any downsides to using this approach normally?
  • I feel like I need this little template if (subscriber == null && !subscriber.isUnsubscribed()) subscriber.onNext(bar);

    every time I want to fix it Bar

    . Is this a common idiom too, or is there a better way?
    Nevermind, I don't need a null check ... not sure what I thought there.
+3


source to share


2 answers


Your example class can't really work: setBar

can call NPE if the caller null

is runUntilUnsubscribed

referencing a missing dash field / value and is a busy loop that will recycle the same value over and over.

You say creating is Bar

expensive, but creating it seems to be outside of the class Foo

, and I think you would like to send such a value to the currently subscribed subscribers. This is what PublishSubject is for:

class Foo {
    final PublishSubject<Bar> subject = PublishSubject.create();
    public void setBar(Bar bar) {
        subject.onNext(bar);
    }
    public Observable<Bar> getBarObservable() {
        return subject; // .asObservable() if you want to hide the subject
    }
}

      

If there are no subscribers, the set of bars will simply drop out and collect trash. If you want to keep the last value use BehaviorSubject

instead PublishSubject

.

Otherwise, if you need to initiate the creation of expensive Bar values ​​when the subscriber arrives, you can use some initial jump sequence with share()

:



Observable.just(1)
.subscribeOn(Schedulers.computation())
.map(v -> createBar())
.share();

      

But the usage share()

really depends on the intended lifecycle of each Bar value.

For example, if you want to keep the bar until the subscribers arrive, then do the heavy calculations once and submit the results, you can create the following construct:

class Foo {
    final BehaviorSubject<Bar> subject = BehaviorSubject.create();
    final Observable<Bar> output = subject
        .observeOn(Schedulers.computation())
        .doOnNext(bar -> expensiveInplaceComputation(bar))
        .take(1)
        .share();

    public void setBar(Bar bar) {
        subject.onNext(bar);
    }
    public Observable<Bar> getBarObservable() {
        return output;
    }
}

      

See this gist for a runnable example.

+3


source


  • Yes, this is roughly the correct approach. If bar

    in Foo

    need to provide all subscribers use .publish().refCount()

    (or share()

    , as you said). If not, then use a generic Observable, which is cold by default.

  • Observables detection is a common scenario. In a good reactive architecture, most classes only have Observables getters because setters are not inherently reactive. Given a program or class that works with setters, you can usually convert it to Observables and getters without affecting functionality. Observations and getters are a desirable approach due to some inversion of control. With setters, if Foo

    sets the value to Baz

    , you need to look at the class Foo

    when you want to understand Baz

    . But with Observables and getters, it Baz

    gets from Foo

    and Baz

    defines how it works, and Foo

    can ignore it Baz

    .

  • I've never had to use this pattern if

    . Also I rarely need Observable.create()

    . There is a lot of assistants observed the creation of ( from

    , interval

    , range

    , just

    , to name a few) and the observed changes (such as the all-powerful flatMap

    ), which allow you to very far from expressing the new observations. Also Subjects allow you to manually create Observables on the fly.



+3


source







All Articles