Skip to content

Instantly share code, notes, and snippets.

@pchw
Created March 3, 2015 14:30
Show Gist options
  • Save pchw/7b772e7571f1acad67f0 to your computer and use it in GitHub Desktop.
Save pchw/7b772e7571f1acad67f0 to your computer and use it in GitHub Desktop.
serialize stream
through2 = require 'through2'
stream = require 'stream'
ReadableStream = stream.Readable
WritableStream = stream.Writable
cb = ->
filter = through2.obj (chunk, enc, callback)->
console.log 'locked'
chunk.sales = "$#{chunk.sales}"
@push chunk
cb = callback
filter.on 'unlock', ->
console.log 'unlocked'
do cb
filter2 = through2.obj (chunk, enc, callback)->
chunk.sales = "[#{chunk.sales}]"
@push chunk
do callback
class EndStream extends WritableStream
constructor: ->
super objectMode: true
_write: (chunk, enc, callback)->
console.log chunk
do callback
process.nextTick ->
filter.emit 'unlock'
endStream = new EndStream()
startStream = through2.obj (chunk, enc, callback)->
@push chunk
do callback
startStream
.pipe(filter)
.pipe(filter2)
.pipe(endStream)
console.log "--- START ---"
setInterval ->
for i in [1..5]
startStream.write
sales_at: (new Date()).getTime()
sales: (Math.random() * 100)^0
, 1000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment