RxJS: Correct way to manually emit observable

Working with RxJS in Angular 4.x, I see two very different patterns for generating Observables from user initiated action streams. One thread is the direct result of the user clicking the add item button, which generates a new object. Another is a series of events emitted by some third code that I am using.

I want to combine these two streams using something like "combLatest" to create a single Observable.

With my button, I followed the following pattern:

const signal = Observable.create(
            (observer) => {
                this.additem= (item) => observer.next(item);
            }
        );

this.item$ = signal.map((item) => [item])
                            .scan((accumulator, value) => {
                                return accumulator.concat(value);
                            });

      

However, I see a lot of information that I should instead use the themes I am trying to use with my callback:

sort$ = new Subject();

sortChange(sort){
        sort$.next(sort);
}

      

Then I try to combine them like this:

combine$ = Observable.combineLatest(sort$, item$,
                  (sort, items) => {
                      return "something that does stuff with these";}
        );

      

My questions are - what is the preferred pattern for manually generating streams? Can / observables and subjects be merged into a single observable, as I am trying to do here?

+3


source to share


1 answer


You can of course combine Observables and Subjects into one thread.

I think the question here is which makes more sense to you. From your description, when implementing something like "add item" functionality, I would prefer Subject

over Observable.create

.

This is because every time you subscribe to yours signal

, you are reassigning this.additem

. The callback Observable.create

is called for each observer. Note that a more correct usage Observable.create

would look like this:

const signal = Observable.create((observer) => {
   this.additem = (item) => observer.next(item);
   return () => this.additem = null;
});

      

The returned callback () => this.additem = null

is called when you unsubscribe from this Observable and that's where you should handle all the cleanup.

However, if you do two subscriptions signal

, you override this.additem

twice, and then if you decide to unsubscribe from one of the observers, you would this.additem = null

, and this will probably lead to unexpected behavior.



Therefore, in this case it makes sense to use Subject

. For example, for example:

const subject = new Subject();
this.additem = (item) => subject.next(item);

      

If you'd like to see a more real-world example Observable.create

, see, for example, the following: Subscribe to a stream using RxJS and the twitter-stream-api module

Edit: Also check out these articles from the RxJS 5 lead developer:

+1


source







All Articles