Reading csv files in a stream and storing them in a database

I have some huge csv files that I need to store in a mongo database. Since these files are too big, I need to use a stream. I am suspending the thread while writing data to the database.

var fs = require('fs');
var csv = require('csv');
var mongo = require('mongodb');

var db = mongo.MongoClient.connect...

var readStream = fs.createReadStream('hugefile.csv');
readStream.on('data', function(data) {
  readStream.pause();
  csv.parse(data.toString(), { delimiter: ','}, function(err, output) {
    db.collection(coll).insert(data, function(err) {
      readStream.resume();
    });
  });
});
readStream.on('end', function() {
  logger.info('file stored');
});

      

But the csv.parse

error is dropping out because I will need to read the files in turn to treat them as csv and convert to json for mongodb. Maybe I shouldn't stop them, but use the interface. I haven't found a solution for this yet.

Any help would be appreciated!

+3


source to share


2 answers


I think you might want to create a stream of strings from a stream of raw data.

Here's an example from a split package. https://www.npmjs.com/package/split

fs.createReadStream(file)
.pipe(split())
.on('data', function (line) {
  //each chunk now is a seperate line! 
})

      



Adapted to your example might look like:

var readStream = fs.createReadStream('hugefile.csv');
var lineStream = readStream.pipe(split());
lineStream.on('data', function(data) {
    //remaining code unmodified

      

+3


source


I'm not sure if mass () was a thing back in 15, but anyone trying to import items from large sources should consider using them.



var fs = require('fs');
var csv = require('fast-csv');
var mongoose = require('mongoose');

var db = mongoose.connect...

var counter = 0;        // to keep count of values in the bulk()
const BULK_SIZE = 1000;
var bulkItem = Item.collection.initializeUnorderedBulkOp();

var readStream = fs.createReadStream('hugefile.csv');
const csvStream = csv.fromStream(readStream, { headers: true });
csvStream.on('data', data => {
    counter++;
    bulkOrder.insert(order);

    if (counter === BATCH_SIZE) {
      csvStream.pause();
      bulkOrder.execute((err, result) => {
        if (err) console.log(err);
        counter = 0;
        bulkItem = Item.collection.initializeUnorderedBulkOp();
        csvStream.resume();
      });
    }
  }
});

      

0


source







All Articles