Skip to content

Instantly share code, notes, and snippets.

@rgarcia
Last active December 11, 2015 22:28
Show Gist options
  • Save rgarcia/4669578 to your computer and use it in GitHub Desktop.
Save rgarcia/4669578 to your computer and use it in GitHub Desktop.
node streams v2
CSVP <Buffer 61 2c 62 2c 63 2c 64 0a 31 2c 32 2c 33 2c 34 0a 35 2c 36 2c 37 2c 38 0a>
DUMP { a: '1', b: '2', c: '3', d: '4' }
_ = require 'underscore'
stream = require 'stream'
fs = require 'fs'
csv = require 'csv'
# takes in a csv file stream, emits json objects
class CSVParser extends stream.Transform
constructor: (@options) ->
super @options
@csv = csv().from.options @options.from
@csv.on 'record', (record) => @push record
@csv.on 'end', (count) => @end()
@csv.on 'error', (error) => @end(error)
_transform: (chunk, output, done) =>
console.log 'CSVP', chunk if @options.debug
@csv.write chunk
done()
# passthrough that dumps to stdout
class Dump extends stream.Transform
_transform: (chunk, output, done) =>
console.log 'DUMP', chunk
output chunk
done()
# _.filter
class Filter extends stream.Transform
constructor: (@options) ->
@options = { test: @options } if _(@options).isFunction()
super @options
_transform: (chunk, output, done) =>
output chunk if @options.test chunk
done()
filestream = fs.createReadStream "#{__dirname}/test1.csv"
csvparser = new CSVParser {from: { header: true, columns: true}, debug: true}
filter = new Filter (obj) -> obj.a < 5
dump = new Dump()
filestream.pipe(csvparser).pipe(filter).pipe(dump)
a b c d
1 2 3 4
5 6 7 8
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment