How to work with large files, NodeJS streams and pipes

I'm a bit new to NodeJS streams, and the more I learn about it, the more I find it to be not a particularly simple and stable thing. I am trying to read large files using csv / csv-parse (apparently the most popular CSV module with NodeJS) using the piping API which assumes using stream-transform by the same author.

some of what I'm experiencing here is actually reproducible without actually using a parser, so I've commented out those parts to make the example simpler (for those who prefer JavaScript over CoffeeScript, there's a JS version as well ):

#-------------------------------------------------------------------------------
fs                        = require 'fs'
transform_stream          = require 'stream-transform'
log                       = console.log
as_transformer            = ( method ) -> transform_stream method, parallel: 11
# _new_csv_parser           = require 'csv-parse'
# new_csv_parser            = -> _new_csv_parser delimiter: ','

#-------------------------------------------------------------------------------
$count = ( input_stream, title ) ->
  count = 0
  #.............................................................................
  input_stream.on 'end', ->
    log ( title ? 'Count' ) + ':', count
  #.............................................................................
  return as_transformer ( record, handler ) =>
    count += 1
    handler null, record

#-------------------------------------------------------------------------------
read_trips = ( route, handler ) ->
  # parser      = new_csv_parser()
  input       = fs.createReadStream route
  #.............................................................................
  input.on 'end', ->
    log 'ok: trips'
    return handler null
  input.setMaxListeners 100 # <<<<<<
  #.............................................................................
  # input.pipe parser
  input.pipe $count input, 'trips A'
    .pipe $count    input, 'trips B'
    .pipe $count    input, 'trips C'
    .pipe $count    input, 'trips D'
    # ... and so on ...
    .pipe $count    input, 'trips Z'
  #.............................................................................
  return null

route = '/Volumes/Storage/cnd/node_modules/timetable-data/germany-berlin-2014/trips.txt'
read_trips route, ( error ) ->
  throw error if error?
  log 'ok'

      

input file contains 204865 lines of GTFS data; I am not parsing it here, just reading it in its raw form, so I assume I am counting on the above code are chunks of data.

I stream from counter to counter and expect it to hit the last counter as often as the first; however this is what I get:

trips A: 157
trips B: 157
trips C: 157
...
trips U: 157
trips V: 144
trips W: 112
trips X: 80
trips Y: 48
trips Z: 16

      

in an earlier setup where I was actually parsing the data, I got this:

trips A: 204865
trips B: 204865
trips C: 204865
...
trips T: 204865
trips U: 180224
trips V: 147456
trips W: 114688
trips X: 81920
trips Y: 49152
trips Z: 16384

      

so it would seem that the stream is somehow running along its path.

My suspicion was that the end

input stream event is not a reliable signal to listen to when trying to decide if all processing is complete - after all, it is logical to assume that processing can only complete some time after the thread has completely consumed.

so I looked for another event to listen to (didn't find one) and deferred the callback call (with setTimeout

, process.nextTick

and setImmediate

), to no avail.

it would be great if someone could point out

  • (1) what are the significant differences between setTimeout

    , process.nextTick

    and setImmediate

    are in this context, and
  • (2) how to reliably determine if the last byte was processed by the last member of the channel.

Update Now I suppose the problem lies in the stream conversion, which had the problem open when someone reported a very similar problem with almost the same numbers (it has 234841 entries and ends in 16390, I have 204865 and ends in 16384). not proof, but too close to chance.

i cross-convert the stream and use event-stream.map instead ; then the test is executed.

+3


source to share


1 answer


after a few days i think i can tell stream-transform has problems with large files.



Since then I have switched to event-stream , which is IMHO the best solution overall as it is completely generic (i.e. about streams in general, not CSV data as streams in particular). I have outlined some thoughts on streaming libraries in NodeJS in the docs for my nascent pipdreams module , which provides a number of widely used stream operations.

+2


source







All Articles