In RxJava, how do I create a potentially endless stream of events thrown from the API?
I have an API available to clients that can be simplified to this:
public class API {
public void sendEvent(Event e);
}
Event
instances enter my system whenever a client calls an API (technically over Binder in a derivative Service
), which are then processed, filtered, and sent to other internals. I don't need past events, only those that are available since the subscriber subscribes. This is like the natural form for the Rx paradigm where I just wet my feet.
I need an Observable that is created once, allows multiple subscribers, and instances can be sent Event
, which are then sent via a reactive pipeline to the observers. A Subject
seems appropriate for what I'm looking for (specifically, this answer to this question resonated with me).
What do other RxJava users recommend?
source to share
For example, following my short comment:
public class API implements OnSubscribe<Event> {
private List<Subscriber<Event>> subscribers = new ArrayList<>();
public void sendEvent(Event event) {
// Do whatever you need with the event
for (Subscriber<Event> sub : subscribers) {
sub.onNext(event);
}
}
public void call(Subscriber<Event> sub) {
subscribers.add(sub);
}
}
Then you probably have an instance somewhere: API api = ...
Your Observation is obtained as follows: Observable.create(api);
Then you can do any normal thing that you do with the Observation .
Filtering canceled subscriptions is Subscriber
left as an exercise for the reader.
Edit
A little more research shows what PublishSubject
should help:
public class API {
private PublishSubject<Event> subject = PublishSubject.create();
public void sendEvent(Event event) {
// Do whatever you need with the event
// Then publish it
subject.onNext(event);
}
public Observable<Event> getObservable() {
return subject.asObservable();
}
}
This way you can subscribe to this Observable and every time an event is posted to API
it is published to all subscribers.
Use this:
API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));
source to share