Nodejs: read from file and save to db, limit max concurrent db operations

I have a CSV file that I read as a stream, and using transforms to convert to JSON, then asynchronously save each line to the DB.

The problem is that reading from a file is fast and therefore leads to a very large number of concurrent operations with asynchronous databases, which causes the application to stop.

I would like to restrict the application in such a way that the maximum number of N operations with the database are performed at any given time.

This is the main core of my _transform function:

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    //Store tick
    db.set(tick.date, tick, function(err, result) {
      console.log(result);
      if(err) throw err;
    });

    this.push(tick);
    done();
};

      

I looked at several options, but they seemed to be the best candidates:

  • Use the async api 'forEachLimit'
    • The problem I see here is that in my streaming conversion I only work on one object (a line from a file) when issuing operations.
    • Reading the entire file is not possible due to size
  • Use an asynchronous, parallel, concurrency limited solution as described here in section 7.2.3:
    • http://book.mixu.net/node/ch7.html
    • The problem for me here is what to do in the case of a "limit".
    • Spinning or using setTimeout seems to use up all of the scheduled time and prevent my DB callbacks from decreasing the start counter.

These were my initial attempts at a "limited w391 solution":

var limit = 100;
var running = 0;

parser._transform = function(data, encoding, done) {
  //push data rows
  var tick = this._parseRow(data);

  this.push(tick);
  //Store tick to db
  if (running < limit) {
    console.log("limit not reached, scheduling set");
    running++;
    cb.set(tick.date, tick, function(err, result) {
      running--;
      console.log("running is:" + running);
      console.log(result);
      if(err) throw err;
    });
  } else {
    console.log("max limit reached, sleeping");
    setTimeout(this._transform(data, encoding, done),1000);
  }
  done();
};

      

I just started node.js this week, so I don't understand what is the correct model to solve this problem.

Note. ... A couple of things I know about are that it should be at least exponential if using the latter model, and there should be some sort of "max backoffs" system so that it doesn't call the call stack. Tried to keep it simple here for the moment though.

+3


source to share


2 answers


The limited concurrency solution option is the approach I would take, but instead of implementing it myself, I just used async . In particular, the queue .

Something like:

var dbQueue = async.queue(function(tick, callback) {
    db.set(tick.date, tick, function(err, result) {
        console.log(result);
        callback(err, result);
    });
}, 3); // the last arg (3) is the concurrency level; tweak as needed

parser._transform = function(data, encoding, done) {
    //push data rows
    var tick = this._parseRow(data);

    dbQueue.push(tick);

    this.push(tick);
    done();
};

      



This will limit your db operations to 3 at a time. In addition, you can use the event queue saturated

and empty

to pause

/ resume

your thread to keep things even more limited in terms of resource use (it would be good if you read really large files), it looks like this:

dbQueue.saturated = function() {
    parser.pause();
}

dbQueue.empty = function() {
    parser.resume();
}

      

+2


source


The database is limited to one concurrent disk write at any given time. With this in mind, any concurrent write slows down the entire operation. If the file is small enough, try reading the entire file into memory and then writing it to the database in one operation. Otherwise, split it into large chunks as you can request them one by one.



+1


source







All Articles