Skip to content

Instantly share code, notes, and snippets.

@darelf
Last active August 29, 2015 14:03
Show Gist options
  • Save darelf/0a6e6a01091d64467379 to your computer and use it in GitHub Desktop.
Save darelf/0a6e6a01091d64467379 to your computer and use it in GitHub Desktop.
Trying to figure out stream Transform with protocol buffers
var msg1 = { msg: 'commit' }
var msg2 = { msg: 'update', id: 5, length: 21 }
var protobuf = require('protocol-buffers')
var schema = protobuf([
{ "name": "msg", "type": "string" },
{ "name": "length", "type": "float" },
{ "name": "id", "type": "float" }
])
var encoder = schema.encode
var decoder = schema.decode
var encode_transformer = require('stream').Transform()
encode_transformer._transform = function(chunk, enc, cb) {
var msg = encoder(JSON.parse(chunk))
if (!msg.length) {
cb()
return
}
var b = new Buffer(4)
b.writeUInt32BE(msg.length, 0)
var buf = Buffer.concat([b,msg])
this.push(buf)
cb()
}
var decode_transformer = require('stream').Transform()
decode_transformer._transform = function(chunk, enc, cb) {
if (!this.buffer) {
this.buffer = chunk
} else {
this.buffer = Buffer.concat([this.buffer, chunk])
}
this.workingLength = this.buffer.readUInt32BE(0)
while (this.workingLength && this.buffer.length >= this.workingLength) {
var output = schema.decode(this.buffer.slice(4,this.workingLength+4))
this.push(JSON.stringify(output))
this.buffer = this.buffer.slice(this.workingLength+4)
if (this.buffer.length) {
this.workingLength = this.buffer.readUInt32BE(0)
} else {
this.workingLength = 0
}
}
cb()
}
decode_transformer.on('data', function(data) {
var d = JSON.parse(data)
console.log(d)
})
encode_transformer.pipe(decode_transformer)
encode_transformer.write(JSON.stringify(msg1))
encode_transformer.write(JSON.stringify(msg2))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment