Observable forkJoin does not fire
I am trying to user forkJoin
on two Observables. One of them starts as a stream ... If I signed them directly, I get a response, forkJoin
does not fire. Any ideas?
private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();
....
this.data$ = this.queryStream
.startWith('')
.flatMap(queryInput => {
this.query = queryInput
return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
})
.share();
...
Observable.forkJoin(this.statuses$, this.companies$)
.subscribe(res => {
console.log('forkjoin');
this._countStatus(res[0], res[1]);
});
// This shows arrays in the console...
this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));
// In the console
Array[9]
Array[6]
source to share
A very common problem with forkJoin
is that all Observables sources require at least one element to be emitted, and they must all be terminated.
In other words, if this.statuses$
or this.companies$
does not emit any element, and until both complete forkJoin
, it will emit nothing.
this.statuses$.subscribe(
res => console.log(res),
undefined,
() => console.log('completed'),
);
source to share
forkJoin
emits only when all internal observables are complete. If you want an equivalent forkJoin
that just listens for one radiation from each source, use combineLatest
+take(1)
combineLatest(
this.statuses$,
this.companies$,
)
.pipe(
take(1),
)
.subscribe(([statuses, companies]) => {
console.log('forkjoin');
this._countStatus(statuses, companies);
});
As soon as both sources combineLatest
, combineLatest
and take(1)
immediately unsubscribe.
source to share
forkJoin
didn't work, so I used the below code to solve my problem. With the help mergeMap
you can match the result of the external subscription with the internal subscription and subscribe to it if you mergeMap
want.
this.statuses$.pipe(
mergeMap(source => this.companies$.pipe(
map(inner => [source , inner])
)
)
).subscribe(([e , r]) => {
console.log(e , r);
})
source to share
Observable.forkJoin([
_someService.getUsers(),
_someService.getCustomers(),
])
.subscribe((data: [Array<User>, Array<Customer>]) => {
let users: Array<User> = data[0];
let customer: Array<Customer> = data[1];
}, err => {
});
//someService
getUsers():Observable<User> {
let url = '/users';
return this._http.get(url, headers)
.map(res => res.json());
}
getCustomers():Observable<Customer> {
let url = '/customers';
return this._http.get(url, headers)
.map(res => res.json());
}
source to share