Best practices for exposing "expensive" observations in RxJava
I am new to RxJava and am trying to define common idioms and best practices.
Let's say I have a class Foo
that emits Bar
(incomplete and simplified at the moment):
class Foo {
public Subscriber barSubscriber;
public Observable<Bar> getBarObservable = (...details omitted...)
private void someMethod() {
// emit a Bar
barSubscriber.onNext(bar);
}
}
Other objects that want to subscribe to those Bars
do so by calling
foo.getBarObservable().subscribe(...);
Let's say that production and radiation Bar
are "expensive". To avoid this when there are no more callers, Foo getBarObservable
can expose a pluggable, ref-counted Observable
, like this (using share()
):
class Foo {
private Subscriber barSubscriber;
private Observable<Bar> barObservable = Observable.create(
new Observable.OnSubscribe<Bar>() {
@Override
public void call(Subscriber<? super Bar> subscriber) {
Foo.this.subscriber = subscriber;
new Thread(new Runnable() {
@Override
public void run() {
runUntilUnsubscribed();
}
}).start();
}
}
).share();
public Observable<Bar> getBarObservable() {
return barObservable;
}
public void runUntilUnsubscribed(} {
while(!subscriber.isUnsubscribed()) {
/* do some heavy stuff that produces a Bar. If, when a
Bar is ready, we still have subscribers, emit the Bar */
if (!subscriber.isUnsubscribed())
subscriber.onNext(bar);
}
}
}
Most of the examples and tutorials I've seen create Observable
inline on the fly in the same piece of code that subscribes to them, so it's not clear to me that the standard practice is in real time, world scenario where create Observable
and subscribe on it are in two different places.
- For a class like
Foo
that that doesn't want to know who its subscribers are or how many subscribers it will have, is this the right approach? - It seems to me that this will be a very typical scenario - isn't it? Or, at a high level, isn't this the right way to think about exposing
Observable
s? Are there any downsides to using this approach normally? -
I feel like I need this little templateNevermind, I don't need a null check ... not sure what I thought there.if (subscriber == null && !subscriber.isUnsubscribed()) subscriber.onNext(bar);
every time I want to fix itBar
. Is this a common idiom too, or is there a better way?
source to share
Your example class can't really work: setBar
can call NPE if the caller null
is runUntilUnsubscribed
referencing a missing dash field / value and is a busy loop that will recycle the same value over and over.
You say creating is Bar
expensive, but creating it seems to be outside of the class Foo
, and I think you would like to send such a value to the currently subscribed subscribers. This is what PublishSubject is for:
class Foo {
final PublishSubject<Bar> subject = PublishSubject.create();
public void setBar(Bar bar) {
subject.onNext(bar);
}
public Observable<Bar> getBarObservable() {
return subject; // .asObservable() if you want to hide the subject
}
}
If there are no subscribers, the set of bars will simply drop out and collect trash. If you want to keep the last value use BehaviorSubject
instead PublishSubject
.
Otherwise, if you need to initiate the creation of expensive Bar values ββwhen the subscriber arrives, you can use some initial jump sequence with share()
:
Observable.just(1)
.subscribeOn(Schedulers.computation())
.map(v -> createBar())
.share();
But the usage share()
really depends on the intended lifecycle of each Bar value.
For example, if you want to keep the bar until the subscribers arrive, then do the heavy calculations once and submit the results, you can create the following construct:
class Foo {
final BehaviorSubject<Bar> subject = BehaviorSubject.create();
final Observable<Bar> output = subject
.observeOn(Schedulers.computation())
.doOnNext(bar -> expensiveInplaceComputation(bar))
.take(1)
.share();
public void setBar(Bar bar) {
subject.onNext(bar);
}
public Observable<Bar> getBarObservable() {
return output;
}
}
See this gist for a runnable example.
source to share
-
Yes, this is roughly the correct approach. If
bar
inFoo
need to provide all subscribers use.publish().refCount()
(orshare()
, as you said). If not, then use a generic Observable, which is cold by default. -
Observables detection is a common scenario. In a good reactive architecture, most classes only have Observables getters because setters are not inherently reactive. Given a program or class that works with setters, you can usually convert it to Observables and getters without affecting functionality. Observations and getters are a desirable approach due to some inversion of control. With setters, if
Foo
sets the value toBaz
, you need to look at the classFoo
when you want to understandBaz
. But with Observables and getters, itBaz
gets fromFoo
andBaz
defines how it works, andFoo
can ignore itBaz
. -
I've never had to use this pattern
if
. Also I rarely needObservable.create()
. There is a lot of assistants observed the creation of (from
,interval
,range
,just
, to name a few) and the observed changes (such as the all-powerfulflatMap
), which allow you to very far from expressing the new observations. Also Subjects allow you to manually create Observables on the fly.
source to share