RxJS: Correct way to manually emit observable
Working with RxJS in Angular 4.x, I see two very different patterns for generating Observables from user initiated action streams. One thread is the direct result of the user clicking the add item button, which generates a new object. Another is a series of events emitted by some third code that I am using.
I want to combine these two streams using something like "combLatest" to create a single Observable.
With my button, I followed the following pattern:
const signal = Observable.create(
(observer) => {
this.additem= (item) => observer.next(item);
}
);
this.item$ = signal.map((item) => [item])
.scan((accumulator, value) => {
return accumulator.concat(value);
});
However, I see a lot of information that I should instead use the themes I am trying to use with my callback:
sort$ = new Subject();
sortChange(sort){
sort$.next(sort);
}
Then I try to combine them like this:
combine$ = Observable.combineLatest(sort$, item$,
(sort, items) => {
return "something that does stuff with these";}
);
My questions are - what is the preferred pattern for manually generating streams? Can / observables and subjects be merged into a single observable, as I am trying to do here?
source to share
You can of course combine Observables and Subjects into one thread.
I think the question here is which makes more sense to you. From your description, when implementing something like "add item" functionality, I would prefer Subject
over Observable.create
.
This is because every time you subscribe to yours signal
, you are reassigning this.additem
. The callback Observable.create
is called for each observer. Note that a more correct usage Observable.create
would look like this:
const signal = Observable.create((observer) => {
this.additem = (item) => observer.next(item);
return () => this.additem = null;
});
The returned callback () => this.additem = null
is called when you unsubscribe from this Observable and that's where you should handle all the cleanup.
However, if you do two subscriptions signal
, you override this.additem
twice, and then if you decide to unsubscribe from one of the observers, you would this.additem = null
, and this will probably lead to unexpected behavior.
Therefore, in this case it makes sense to use Subject
. For example, for example:
const subject = new Subject();
this.additem = (item) => subject.next(item);
If you'd like to see a more real-world example Observable.create
, see, for example, the following: Subscribe to a stream using RxJS and the twitter-stream-api module
Edit: Also check out these articles from the RxJS 5 lead developer:
source to share