Observable forkJoin does not fire

I am trying to user forkJoin

on two Observables. One of them starts as a stream ... If I signed them directly, I get a response, forkJoin

does not fire. Any ideas?

private data$: Observable<any[]>;
private statuses$: Observable<any[]>;
private queryStream = new Subject<string>();    

....

this.data$ = this.queryStream
    .startWith('')
     .flatMap(queryInput => {
            this.query = queryInput
            return this._companyService.getCompanies(this.queryRequired + ' ' + this.query, this.page, this.sort);
                })
            .share();

...

Observable.forkJoin(this.statuses$, this.companies$)
            .subscribe(res => {
                console.log('forkjoin');
                this._countStatus(res[0], res[1]);
            });


// This shows arrays in the console...

this.statuses$.subscribe(res => console.log(res));
this.companies$.subscribe(res => console.log(res));

// In the console
Array[9]
Array[6]

      

+2


source to share


4 answers


A very common problem with forkJoin

is that all Observables sources require at least one element to be emitted, and they must all be terminated.

In other words, if this.statuses$

or this.companies$

does not emit any element, and until both complete forkJoin

, it will emit nothing.



this.statuses$.subscribe(
    res => console.log(res),
    undefined,
    () => console.log('completed'),
);

      

+3


source


forkJoin

emits only when all internal observables are complete. If you want an equivalent forkJoin

that just listens for one radiation from each source, use combineLatest

+take(1)

combineLatest(
  this.statuses$,
  this.companies$,
)
.pipe(
  take(1),
)
.subscribe(([statuses, companies]) => {
  console.log('forkjoin');
  this._countStatus(statuses, companies);
});

      



As soon as both sources combineLatest

, combineLatest

and take(1)

immediately unsubscribe.

+1


source


forkJoin

didn't work, so I used the below code to solve my problem. With the help mergeMap

you can match the result of the external subscription with the internal subscription and subscribe to it if you mergeMap

want.

this.statuses$.pipe(
    mergeMap(source => this.companies$.pipe(
        map(inner => [source , inner])
        )
    )
).subscribe(([e , r]) => {
    console.log(e , r);
})

      

+1


source


Observable.forkJoin([
      _someService.getUsers(),
      _someService.getCustomers(),
    ])
      .subscribe((data: [Array<User>, Array<Customer>]) => {
        let users: Array<User> = data[0];
        let customer: Array<Customer> = data[1];
      }, err => {
      });





      //someService
        getUsers():Observable<User> {
          let url = '/users';
          return this._http.get(url, headers)
            .map(res => res.json());
        }

        getCustomers():Observable<Customer> {
          let url = '/customers';
          return this._http.get(url, headers)
            .map(res => res.json());
        }

      

0


source







All Articles