Reading csv files in a stream and storing them in a database
I have some huge csv files that I need to store in a mongo database. Since these files are too big, I need to use a stream. I am suspending the thread while writing data to the database.
var fs = require('fs');
var csv = require('csv');
var mongo = require('mongodb');
var db = mongo.MongoClient.connect...
var readStream = fs.createReadStream('hugefile.csv');
readStream.on('data', function(data) {
readStream.pause();
csv.parse(data.toString(), { delimiter: ','}, function(err, output) {
db.collection(coll).insert(data, function(err) {
readStream.resume();
});
});
});
readStream.on('end', function() {
logger.info('file stored');
});
But the csv.parse
error is dropping out because I will need to read the files in turn to treat them as csv and convert to json for mongodb. Maybe I shouldn't stop them, but use the interface. I haven't found a solution for this yet.
Any help would be appreciated!
source to share
I think you might want to create a stream of strings from a stream of raw data.
Here's an example from a split package. https://www.npmjs.com/package/split
fs.createReadStream(file)
.pipe(split())
.on('data', function (line) {
//each chunk now is a seperate line!
})
Adapted to your example might look like:
var readStream = fs.createReadStream('hugefile.csv');
var lineStream = readStream.pipe(split());
lineStream.on('data', function(data) {
//remaining code unmodified
source to share
I'm not sure if mass () was a thing back in 15, but anyone trying to import items from large sources should consider using them.
var fs = require('fs');
var csv = require('fast-csv');
var mongoose = require('mongoose');
var db = mongoose.connect...
var counter = 0; // to keep count of values in the bulk()
const BULK_SIZE = 1000;
var bulkItem = Item.collection.initializeUnorderedBulkOp();
var readStream = fs.createReadStream('hugefile.csv');
const csvStream = csv.fromStream(readStream, { headers: true });
csvStream.on('data', data => {
counter++;
bulkOrder.insert(order);
if (counter === BATCH_SIZE) {
csvStream.pause();
bulkOrder.execute((err, result) => {
if (err) console.log(err);
counter = 0;
bulkItem = Item.collection.initializeUnorderedBulkOp();
csvStream.resume();
});
}
}
});
source to share