In RxJava, how do I create a potentially endless stream of events thrown from the API?

I have an API available to clients that can be simplified to this:

public class API {
  public void sendEvent(Event e);
}

      

Event

instances enter my system whenever a client calls an API (technically over Binder in a derivative Service

), which are then processed, filtered, and sent to other internals. I don't need past events, only those that are available since the subscriber subscribes. This is like the natural form for the Rx paradigm where I just wet my feet.

I need an Observable that is created once, allows multiple subscribers, and instances can be sent Event

, which are then sent via a reactive pipeline to the observers. A Subject

seems appropriate for what I'm looking for (specifically, this answer to this question resonated with me).

What do other RxJava users recommend?

+3


source to share


2 answers


For example, following my short comment:

public class API implements OnSubscribe<Event> {
    private List<Subscriber<Event>> subscribers = new ArrayList<>();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        for (Subscriber<Event> sub : subscribers) {
            sub.onNext(event);
        }
    }
    public void call(Subscriber<Event> sub) {
        subscribers.add(sub);
    }
}

      

Then you probably have an instance somewhere: API api = ...

Your Observation is obtained as follows: Observable.create(api);

Then you can do any normal thing that you do with the Observation .

Filtering canceled subscriptions is Subscriber

left as an exercise for the reader.

Edit



A little more research shows what PublishSubject

should help:

public class API {
    private PublishSubject<Event> subject = PublishSubject.create();

    public void sendEvent(Event event) {
        // Do whatever you need with the event
        // Then publish it
        subject.onNext(event);
    }
    public Observable<Event> getObservable() {
        return subject.asObservable();
    }
}

      

This way you can subscribe to this Observable and every time an event is posted to API

it is published to all subscribers.

Use this:

API api = ...;
api.getObservable().subscribe(event -> doStuffWithEvent(event));
api.getObservable().subscribe(event -> doOtherStuffWithEvent(event));

      

+6


source


Try observable.share()

that under the covers calls .publish().refCount()

. It will only use one basic subscription and give you the multi-subscription job you specified.



+1


source







All Articles