Chain Observable after takeWhile is not called?

I have the following methods to be called like this:

  • registerDomain

    should be called and should return operationId

  • After 10

    seconds, the getOperationDetail

    transmission should be calledoperationId

  • getOperationDetail

    should be called every 10

    seconds until returning successful

    .
  • Once getOperationDetail

    completed, createRecordSets

    should be called.
  • Finally, getChangeStatus

    should be called until it returnsINSYNC

  • If any of the api calls throws an exception, how can I handle the error on the client side?

The following code below calls registerDomain and getOperationDetail, but after getOperationDetail completes, it doesn't move to createRecordSets.

  registerDomain(domain) {
    return this._adminService.registerDomain(domain)
      .concatMap(operation => this.getOperationDetail(operation.OperationId))
      .concatMap(() => this._adminService.createRecordSets(domain));
  }

  getOperationDetail(operationId) {
    return Observable.interval(10000)
      .mergeMap(() => this._adminService.getOperationDetail(operationId))
      .takeWhile((info) => info.Status.Value !== 'SUCCESSFUL');
  }
  createRecordSets(caseWebsiteUrl) {
    return this._adminService.createRecordSets(caseWebsiteUrl.Url)
        .concatMap(registerId => this.getChangeStatus(registerId));
  }

  getChangeStatus(registerId) {
    return Observable.interval(5000)
      .mergeMap(() => this._adminService.getChange(registerId))
      .takeWhile((info) => info.ChangeInfo.Status.Value !== 'INSYNC');
  }

      

I updated getOperationDetail

to use the operator first

:

  getOperationDetail(operationId) {
    return Observable.interval(3000)
      .mergeMap(() => this._adminService.getOperationDetail(operationId))
      .first((info) =>  info.Status.Value === 'SUCCESSFUL')

  }

      

Now he actually calls createRecordSets

, but after createRecordSets

he continues to be getOperationDetail

about 13 times and eventually causes getChangeStatus

. As I looked at this, I thought it would be:

  • Call getOperationDetail

    until you receive SUCCESS

    .
  • Call createRecordSets

    once.
  • Call getChangeStatus

    until he getsINSYNC

  • Done.

Why additional calls?

I changed registerDomain to look like this:

 registerDomain(domain) {
    return this._adminService.registerDomain(domain)
      .concatMap(operation => this.getOperationDetail(operation.OperationId))
        .concatMap((op) => this.createRecordSets(op));

      

Before I got .concatMap((op) => this.createRecordSets(op))

, shackled right after this.getOperationDetail

. Once I moved it outside of this, it started working as expected. However, I'm not sure why. Can someone please explain?

+3


source to share


1 answer


When it takeWhile

matches a value that meets the specified criteria, it ends the observable without spreading the value. This means that the operator of the next chain will not receive a value and will not call a callback.

Suppose, in your example, the first two calls this._adminService.getOperationDetail(...)

result in a failure status and the third call succeeds. This means that the observable returned getOperationDetail()

will only produce two values info

, each with an unsuccessful status. And what can also be important, the next coded statement concatMap

will reference its callback for each of these failed values, which means it createRecordSets()

will be called twice. I believe you can avoid this.

I would suggest using first

:

getOperationDetail(operationId) {
    return Observable.interval(10000)
        .concatMap(() => this._adminService.getOperationDetail(operationId))
        .first(info => info.Status.Value !== 'SUCCESSFUL');
}

      

Thus, it getOperationDetail()

will only return one "successful" value as soon as this._adminService.getOperationDetail(operationId)

it succeeds. The operator first

emits the first observable value that matches the specified condition and then exits.

And when it comes to error handling, catch

or retry

may be helpful.

Update:

The unexpected behavior you are encountering ( getOperationDetail()

keeps getting called after completion first()

) seems to be a bug in rxjs

. As described in this issue ,

each take-ish statement (the one that completed before its Observable source) will maintain a subscription to the Observable source, combined with a statement that renews the subscription (here switchMap).

Both first

and takeWhile

are examples of operators and operators that "renew" a subscription, for example switchMap

, concatMap

and mergeMap

. In the following example, numbers will be stored in the log and the internal value concatMap

emits values:

var takeish$ = Rx.Observable.interval(200)
    // will keep logging until inner observable of `concatMap` is completed
    .do(x => console.log(x))
    .takeWhile(x => x < 2);

var source = takeish$
    .concatMap(x => Rx.Observable.interval(200).take(10))
    .subscribe();

      

It looks like this can be worked around by rotating an observable containing such an operator to an observable higher order - similar to how you did it:

var takeish$ = Rx.Observable.interval(200)
    // will log only 0, 1, 2
    .do(x => console.log(x))
    .takeWhile(x => x < 2);

var source = Rx.Observable.of(null)
    .switchMap(() => takeish$)
    .concatMap(x => Rx.Observable.interval(200).take(1))
    .subscribe();

      

Update 2:

It seems that the error described above still exists as of rxjs 5.4.2. This affects, for example, whether the source watched for the operator first

will unsubscribe if it first

meets the specified condition. When the operator first

follows immediately concatMap

, his observed source will not unsubscribe and will continue to emit values ​​until internal visibility concatMap

ends. In your case, this means it this._adminService.getOperationDetail()

will keep being called until the observation returned is complete createRecordSets()

.



Here's your example simplified to illustrate the behavior:

function registerDomain() {
    return Rx.Observable.of("operation")
        .concatMap(() => getOperationDetail()
            .concatMap(() => Rx.Observable.interval(200).take(5)));
}

function getOperationDetail() {
    return Rx.Observable.interval(100)
        // console.log() instead of the actual service call
        .do(x => console.log(x))
        .first(x => x === 2);
}

registerDomain().subscribe();
      

<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
      

Run codeHide result


If we expand the internal observation of the first operator concatMap

, we get the following observable:

Rx.Observable.interval(100)
    .do(x => console.log(x))
    .first(x => x === 2)
    .concatMap(() => Rx.Observable.interval(200).take(5));

      

Note that first

immediately follows concatMap

, which prevents the subscriber from abandoning the origin first

(i.e. interval(100).do(x => console.log(x)

). The values ​​will be kept in the log (or in your case, service calls will be sent) until the internal visibility ends concatMap

(i.e. interval(200).take(5)

).

If we change the above example and move the second concatMap

from the internal observable of the first concatMap

, first

it will no longer bind to it and unsubscribe to the source observable as soon as the condition is met, which means that the interval will stop emitting values ​​and no more numbers will be logged (or more service requests sent):

function registerDomain() {
    return Rx.Observable.of("operation")
        .concatMap(() => getOperationDetail())
        .concatMap(() => Rx.Observable.interval(200).take(5));
}

function getOperationDetail() {
    return Rx.Observable.interval(100)
        // console.log() instead of the actual service call
        .do(x => console.log(x))
        .first(x => x === 2);
}

registerDomain().subscribe();
      

<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
      

Run codeHide result


In this case, the internal observable can be decomposed only into:

Rx.Observable.interval(100)
    .do(x => console.log(x))
    .first(x => x === 2)

      

Please note that first

no more follows concatMap

.

It's also worth mentioning that in both cases the observable returned registerDomain()

produces exactly the same values, and if we move the log from statement do()

to subscribe()

, the same values ​​will be written to the console in both cases

registerDomain.subscribe(x => console.log(x));

      

+2


source







All Articles