Skip to content

Instantly share code, notes, and snippets.

@chrisdickinson
Last active December 15, 2015 08:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chrisdickinson/5233706 to your computer and use it in GitHub Desktop.
Save chrisdickinson/5233706 to your computer and use it in GitHub Desktop.
duplex stream wrapping a line-delimited json protocol
var lines = require('line-stream')
, through = require('through')
, emit = require('emit-function')
, duplex = require('duplex')
module.exports = json
function json(transport) {
var input = through(write)
, output = through(recv)
, stream = duplex()
, error = emit(stream, 'error')
, ls
input.pipe(transport).pipe(ls = lines()).pipe(output)
stream
.on('_data', write)
.on('_end', input.end.bind(input))
input
.on('error', error)
.on('drain', stream.resume.bind(stream))
transport
.on('error', error)
ls.on('error', error)
ls.on('pipe', function(src) {
if(src.setEncoding) {
src.setEncoding('utf8')
}
})
output
.on('error', error)
.on('data', stream._data)
return stream
function write(data) {
input.queue(JSON.stringify(data)+'\n')
}
function recv(data) {
output.queue(JSON.parse(data))
}
}
var lines = require('line-stream')
, through = require('through')
, emit = require('emit-function')
module.exports = json
function json(transport) {
var input = through()
, output = through()
, stream = through(write)
, error = emit(stream, 'error')
, ls
input.pipe(transport).pipe(ls = lines()).pipe(output)
output.on('data', function(d) {
stream.queue(d)
})
input
.on('error', error)
.on('drain', stream.resume.bind(stream))
transport
.on('error', error)
ls.on('error', error)
ls.on('pipe', function(src) {
if(src.setEncoding) {
src.setEncoding('utf8')
}
})
output
.on('error', error)
return stream
function write(data) {
input.queue(JSON.stringify(data)+'\n')
}
function recv(data) {
output.queue(JSON.parse(data))
}
}
var json = require('./index')
, through = require('through')
var example_transport = through(function(d) { this.queue(d) })
, duplexed = json(example_transport)
duplexed.on('data', function(d) {
console.log(d)
})
setInterval(function() {
duplexed.write({time: Date.now(), value: Math.random() * 1000 | 0})
}, 100)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment