How do I wait for the promise to allow and skip intermediate elements in RxJS?

Let's say that I have a stream (observable) with some elements:

--a---b-c---d--

      

If I have a function that takes one of these elements and returns a promise as a request, and I do flatMap

with that function, the resulting stream of responses would be something like this (uppercase letters are responses):

--a---b-c---d--
----A----B---CD

      

But this means that the request for c

will start before the request is completed b

. Suppose I want to avoid executing a queryc

and get this as a result:

--a---b-c---d--
----A----B----D

      

How should I approach this problem?


In the following code, I have a stream that is emitted after 1, 2, 4 and 7 seconds. I have a function request

that takes two seconds to complete. I only want the function to be called on 1, 4 and 7 (not from 2, because the request for 1 has not completed yet).

const Rx = require('rx');

const logNext      = x => console.log(new Date(), 'Next:', x);
const logError     = x => console.log(new Date(), 'Error:', x);
const logCompleted = () => console.log(new Date(), 'Completed.');

Rx.Observable.fromArray([1, 2, 4, 7])
  .flatMap(x => Rx.Observable.of(x).delay(x * 1000))
  .flatMapFirst(request)
  .subscribe(logNext, logError, logCompleted);


function request(x) {
  console.log(`Starting request with ${x}`);
  return new Promise(resolve => {
    setTimeout(
      () => {
        console.log(`Finishing request with ${x}`);
        resolve(x)
      },
      2000
    );
  })
}

      

flatMapFirst

creates the correct response stream, but I want to avoid the side effects caused by the call request(2)

.

+3


source to share


1 answer


you can use flatMapFirst

if you are using rxjs v4. I couldn't make sure the operator exists in rxjs v5. From the documentation:

The flatMapFirst operator is similar to flatMap and concatMap however the methods described above instead of emitting all the elements emitted by all observers the operator generates Converting the elements from the Observable source flatMapFirst instead propagates the first observation exclusively until it completes early it starts subscribing to the next Observable. Observables that come before the current Observable is canceled and will not propagate.

The code could be like this:

source$.flatMapFirst(makeRequest)

      

What happens is what the incoming b

will lead to creation makeRequest(b)

as well as for c

. However, it is makeRequest(c)

usually unsigned, which means that the effects that might be included in its implementation will not be enforced.



If makeRequest

itself (i.e. a function, not an observable makeRequest(x)

, actually performs some effects to create its observable output, and you want to prevent this from happening, then you can use defer

:

source$.flatMapFirst(x => Rx.Observable.defer(() => makeRequest(x)))

      

You can also look at the previous answers for more use cases defer

:

+3


source







All Articles