Rx.js waiting for callback to complete

I am using Rx.js to process the content of a file, make an HTTP request for each line, and then aggregate the results. However, the source file is thousands of lines long and I am overloading the remote http api with which I am making an HTTP request. I need to make sure that I am expecting an existing HTTP request to be called back before starting another one. I would be open to batch processing and executing requests n

at the same time, but for this script, executing requests in a serial file is sufficient.

I have the following:

const fs = require('fs');
const rx = require('rx');
const rxNode = require('rx-node');

const doHttpRequest = rx.Observable.fromCallback((params, callback) => {
  process.nextTick(() => {
    callback('http response');
  });
});

rxNode.fromReadableStream(fs.createReadStream('./source-file.txt'))
  .flatMap(t => t.toString().split('\r\n'))
  .take(5)
  .concatMap(t => {
    console.log('Submitting request');

    return doHttpRequest(t);
  })
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

      

However, this does not execute HTTP requests in a sequential order. It outputs:

Submitting request
Submitting request
Submitting request
Submitting request
Submitting request
http response
http response
http response
http response
http response
Completed

If I remove the call concatAll()

, then the requests are serial, but my subscription function sees the observables before the http requests are returned.

How can I make HTTP requests one by one so that the result is below?

Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Submitting request
http response
Completed
+1


source to share


1 answer


The problem here is probably that when used, the rx.Observable.fromCallback

function you passed in the argument is executed immediately. The returned observation will contain the value passed to the callback at a later point in time. To better understand what's going on, you should use a slightly more sophisticated simulation: the number of your requests so that they return the actual (different for each request) result, which you can observe via subscription.

What am I doing here:

  • take(5)

    gives 5 values
  • map

    issues 5 log messages, performs 5 functions and transmits 5 monitored
  • these 5 observables are processed concatAll

    , and the values โ€‹โ€‹returned by these observables will be fine as expected. What you order here is the result of function calls, not calls to the functions themselves.

To achieve your goal, you only need to call your observable factory ( rx.Observable.fromCallback

) when concatAll

subscribing to it, not at creation time. For this you can use defer

: https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/defer.md



Thus, your code will turn into:

rxNode.fromReadableStream(fs.createReadStream('./path-to-file'))
  .map(t => t.toString().split('\r\n'))
  .flatMap(t => t)
  .take(5)
  .map(t => {
    console.log('Submitting request');

    return Observable.defer(function(){return doHttpRequest(t);})
  })
  .concatAll()
  .subscribe(results => {
    console.log(results);
  }, err => {
    console.error('Error', err);
  }, () => {
    console.log('Completed');
  });

      

You can see a similar problem with a great explanation here: How to start a second observable * only * after first being * completely * done in rxjs

Your log will still display 5 consecutive "Submitting Request" messages. But your request has to be fulfilled one by one if you want.

+1


source







All Articles