Chain Observable after takeWhile is not called?
I have the following methods to be called like this:
-
registerDomain
should be called and should returnoperationId
- After
10
seconds, thegetOperationDetail
transmission should be calledoperationId
-
getOperationDetail
should be called every10
seconds until returningsuccessful
. - Once
getOperationDetail
completed,createRecordSets
should be called. - Finally,
getChangeStatus
should be called until it returnsINSYNC
- If any of the api calls throws an exception, how can I handle the error on the client side?
The following code below calls registerDomain and getOperationDetail, but after getOperationDetail completes, it doesn't move to createRecordSets.
registerDomain(domain) {
return this._adminService.registerDomain(domain)
.concatMap(operation => this.getOperationDetail(operation.OperationId))
.concatMap(() => this._adminService.createRecordSets(domain));
}
getOperationDetail(operationId) {
return Observable.interval(10000)
.mergeMap(() => this._adminService.getOperationDetail(operationId))
.takeWhile((info) => info.Status.Value !== 'SUCCESSFUL');
}
createRecordSets(caseWebsiteUrl) {
return this._adminService.createRecordSets(caseWebsiteUrl.Url)
.concatMap(registerId => this.getChangeStatus(registerId));
}
getChangeStatus(registerId) {
return Observable.interval(5000)
.mergeMap(() => this._adminService.getChange(registerId))
.takeWhile((info) => info.ChangeInfo.Status.Value !== 'INSYNC');
}
I updated getOperationDetail
to use the operator first
:
getOperationDetail(operationId) {
return Observable.interval(3000)
.mergeMap(() => this._adminService.getOperationDetail(operationId))
.first((info) => info.Status.Value === 'SUCCESSFUL')
}
Now he actually calls createRecordSets
, but after createRecordSets
he continues to be getOperationDetail
about 13 times and eventually causes getChangeStatus
. As I looked at this, I thought it would be:
- Call
getOperationDetail
until you receiveSUCCESS
. - Call
createRecordSets
once. - Call
getChangeStatus
until he getsINSYNC
- Done.
Why additional calls?
I changed registerDomain to look like this:
registerDomain(domain) {
return this._adminService.registerDomain(domain)
.concatMap(operation => this.getOperationDetail(operation.OperationId))
.concatMap((op) => this.createRecordSets(op));
Before I got .concatMap((op) => this.createRecordSets(op))
, shackled right after this.getOperationDetail
. Once I moved it outside of this, it started working as expected. However, I'm not sure why. Can someone please explain?
source to share
When it takeWhile
matches a value that meets the specified criteria, it ends the observable without spreading the value. This means that the operator of the next chain will not receive a value and will not call a callback.
Suppose, in your example, the first two calls this._adminService.getOperationDetail(...)
result in a failure status and the third call succeeds. This means that the observable returned getOperationDetail()
will only produce two values info
, each with an unsuccessful status. And what can also be important, the next coded statement concatMap
will reference its callback for each of these failed values, which means it createRecordSets()
will be called twice. I believe you can avoid this.
I would suggest using first
:
getOperationDetail(operationId) {
return Observable.interval(10000)
.concatMap(() => this._adminService.getOperationDetail(operationId))
.first(info => info.Status.Value !== 'SUCCESSFUL');
}
Thus, it getOperationDetail()
will only return one "successful" value as soon as this._adminService.getOperationDetail(operationId)
it succeeds. The operator first
emits the first observable value that matches the specified condition and then exits.
And when it comes to error handling, catch
or retry
may be helpful.
Update:
The unexpected behavior you are encountering ( getOperationDetail()
keeps getting called after completion first()
) seems to be a bug in rxjs
. As described in this issue ,
each take-ish statement (the one that completed before its Observable source) will maintain a subscription to the Observable source, combined with a statement that renews the subscription (here switchMap).
Both first
and takeWhile
are examples of operators and operators that "renew" a subscription, for example switchMap
, concatMap
and mergeMap
. In the following example, numbers will be stored in the log and the internal value concatMap
emits values:
var takeish$ = Rx.Observable.interval(200)
// will keep logging until inner observable of `concatMap` is completed
.do(x => console.log(x))
.takeWhile(x => x < 2);
var source = takeish$
.concatMap(x => Rx.Observable.interval(200).take(10))
.subscribe();
It looks like this can be worked around by rotating an observable containing such an operator to an observable higher order - similar to how you did it:
var takeish$ = Rx.Observable.interval(200)
// will log only 0, 1, 2
.do(x => console.log(x))
.takeWhile(x => x < 2);
var source = Rx.Observable.of(null)
.switchMap(() => takeish$)
.concatMap(x => Rx.Observable.interval(200).take(1))
.subscribe();
Update 2:
It seems that the error described above still exists as of rxjs 5.4.2. This affects, for example, whether the source watched for the operator first
will unsubscribe if it first
meets the specified condition. When the operator first
follows immediately concatMap
, his observed source will not unsubscribe and will continue to emit values ββuntil internal visibility concatMap
ends. In your case, this means it this._adminService.getOperationDetail()
will keep being called until the observation returned is complete createRecordSets()
.
Here's your example simplified to illustrate the behavior:
function registerDomain() {
return Rx.Observable.of("operation")
.concatMap(() => getOperationDetail()
.concatMap(() => Rx.Observable.interval(200).take(5)));
}
function getOperationDetail() {
return Rx.Observable.interval(100)
// console.log() instead of the actual service call
.do(x => console.log(x))
.first(x => x === 2);
}
registerDomain().subscribe();
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
If we expand the internal observation of the first operator concatMap
, we get the following observable:
Rx.Observable.interval(100)
.do(x => console.log(x))
.first(x => x === 2)
.concatMap(() => Rx.Observable.interval(200).take(5));
Note that first
immediately follows concatMap
, which prevents the subscriber from abandoning the origin first
(i.e. interval(100).do(x => console.log(x)
). The values ββwill be kept in the log (or in your case, service calls will be sent) until the internal visibility ends concatMap
(i.e. interval(200).take(5)
).
If we change the above example and move the second concatMap
from the internal observable of the first concatMap
, first
it will no longer bind to it and unsubscribe to the source observable as soon as the condition is met, which means that the interval will stop emitting values ββand no more numbers will be logged (or more service requests sent):
function registerDomain() {
return Rx.Observable.of("operation")
.concatMap(() => getOperationDetail())
.concatMap(() => Rx.Observable.interval(200).take(5));
}
function getOperationDetail() {
return Rx.Observable.interval(100)
// console.log() instead of the actual service call
.do(x => console.log(x))
.first(x => x === 2);
}
registerDomain().subscribe();
<script src="https://unpkg.com/@reactivex/rxjs@5.0.3/dist/global/Rx.js"></script>
In this case, the internal observable can be decomposed only into:
Rx.Observable.interval(100)
.do(x => console.log(x))
.first(x => x === 2)
Please note that first
no more follows concatMap
.
It's also worth mentioning that in both cases the observable returned registerDomain()
produces exactly the same values, and if we move the log from statement do()
to subscribe()
, the same values ββwill be written to the console in both cases
registerDomain.subscribe(x => console.log(x));
source to share