Skip to content

Instantly share code, notes, and snippets.

@pchw
Last active August 29, 2015 14:13
Show Gist options
  • Save pchw/c3d94ebefda455125df0 to your computer and use it in GitHub Desktop.
Save pchw/c3d94ebefda455125df0 to your computer and use it in GitHub Desktop.
stream test
$ DEBUG=sample coffee stream.coffee
sample EventEmitter onPush +0ms
sample Channel Filter 1 Stream In +1ms
sample Channel Filter 2 Stream In +2ms
sample OutputStream._write +0ms
{
"report": {
"channel1": "channel1",
"channel2": "channel2"
},
"original": {
"_id": "53168af5f3ee8c200800006b",
"updated_at": "2014-03-05T02:24:53.695Z",
"channel": "Other"
}
}
debug = require('debug')('sample')
through2 = require 'through2'
stream = require 'stream'
Transform = stream.Transform
ReadableStream = stream.Readable
WritableStream = stream.Writable
EventEmitter = require('events').EventEmitter
ChannelFilter1 = through2 (chunk, enc, callback)->
debug "Channel Filter 1 Stream In"
obj = JSON.parse chunk
obj.report.channel1 = "channel1"
@push JSON.stringify obj
do callback
ChannelFilter2 = through2 (chunk, enc, callback)->
debug "Channel Filter 2 Stream In"
obj = JSON.parse chunk
obj.report.channel2 = "channel2"
@push JSON.stringify obj
do callback
class SourceStream extends ReadableStream
_read: ->
debug "SourceStream._read"
obj =
report: {}
original:
_id: "53168af5f3ee8c200800006b"
updated_at: "2014-03-05T02:24:53.695Z"
channel: "Other"
@push JSON.stringify obj
@push null
class OutputStream extends WritableStream
_write: (chunk, encoding, callback)->
debug "OutputStream._write"
obj = JSON.parse chunk
console.log JSON.stringify obj, null, 2
do callback
# r = new SourceStream
# r
# .pipe(ChannelFilter1)
# .pipe(ChannelFilter2)
# .pipe(new OutputStream)
ChannelFilter1
.pipe(ChannelFilter2)
.pipe(new OutputStream)
obj =
report: {}
original:
_id: "53168af5f3ee8c200800006b"
updated_at: "2014-03-05T02:24:53.695Z"
channel: "Other"
ev = new EventEmitter
ev.on 'push', (o)->
debug "EventEmitter onPush"
ChannelFilter1.write JSON.stringify o
ev.emit 'push', obj
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment