RxJS: How to emit the original values ​​and then scale down after completion?

I would like to emit all original values ​​from the RxJS stream and then generate a summary when done.

Decreasing stops the original values ​​when emitted. The scan emits each amount, not the original values.

Here's my hacky solution:

let total = {
  total: 0
};

Rx.Observable.range(1, 3)
  .do(val => {
    total.total += val;
  })
  .concat(Rx.Observable.of(total))
  .subscribe(
    value => {
      console.log('Next:', value)
    }
  );

// Next: 1
// Next: 2
// Next: 3
// Next: { total: 6 }

      

What's an easy way to do this with pure RxJS streams?

+3


source to share


3 answers


Use multicast



Rx.Observable.range(1, 3)
 .multicast(new Rx.Subject(), (shared)=> {
    return Rx.Observable.merge(shared, shared.reduce((acc, x)=>acc+x,0))
 })
.subscribe(x=>console.log(x))

      

+4


source


Alternatively, you could avoid using share()

and creating two Observable chains and only make one chain:

Observable.range(1, 3)
    .concat(Observable.of(null))
    .scan((obj, curr) => {
        if (curr) {
            obj.acc.push(curr);
        }
        obj.curr = curr;
        return obj;
    }, { acc: [], curr: 0 })
    .map(obj => obj.curr === null
        ? { total: (obj.acc.reduce((acc, curr) => acc + curr, 0)) }  // count total
        : obj.curr  // just return current item
    )
    .subscribe(console.log);

      

This outputs the output you expect:

1
2
3
{ total: 6 }

      



Even though it share()

looks very simple to use, remember that you are actually subscribing to the Observable twice. In practice, this may not be a problem for you, depending on which Observable source you use.

Try this and make sure each number is printed twice:

let source = Observable.range(1, 3).do(console.log).share();

      

+2


source


What about?

let source = Observable.range(1, 3).share();

let totalOb = source
    .reduce((total, value) => total + value, 0);

source
    .concat(totalOb)
    .subscribe( value => console.log(`Next: ${value}`) );

      

Output:

Next: 1
Next: 2
Next: 3
Next: 6

      

You can use throw

both catch

to separate data and summary.

let source = Observable.range(1, 3).share();

let totalOb = source
    .reduce((total, value) => total + value, 0)
    .mergeMap(total => Observable.throw(total));

source
    .concat(totalOb)
    .subscribe(
        value => console.log(`Next: ${value}`),
        value => console.log(`Total: ${value}`)
    );

      

Output:

Next: 1
Next: 2
Next: 3
Total: 6

      

+1


source







All Articles