Skip to content

Instantly share code, notes, and snippets.

@sorccu
Last active September 2, 2019 07:54
Show Gist options
  • Save sorccu/63cdb5cb55df46e26a17 to your computer and use it in GitHub Desktop.
Save sorccu/63cdb5cb55df46e26a17 to your computer and use it in GitHub Desktop.
Delimited ProtoBuf.js streams.
var util = require('util')
var stream = require('stream')
function DelimitedStream() {
stream.Transform.call(this)
this._length = 0
this._lengthIndex = 0
this._readingLength = true
this._buffer = new Buffer(0)
}
util.inherits(DelimitedStream, stream.Transform)
DelimitedStream.prototype._transform = function(chunk, encoding, done) {
this._buffer = Buffer.concat([this._buffer, chunk])
while (this._buffer.length) {
if (this._readingLength) {
var byte = this._buffer[0]
this._length += (byte & 0x7f) << (7 * this._lengthIndex)
if (byte & (1 << 7)) {
this._lengthIndex += 1
this._readingLength = true
}
else {
this._lengthIndex = 0
this._readingLength = false
}
this._buffer = this._buffer.slice(1)
}
else {
if (this._length <= this._buffer.length) {
this.push(this._buffer.slice(0, this._length))
this._buffer = this._buffer.slice(this._length)
this._length = 0
this._readingLength = true
}
else {
// Wait for more chunks
break
}
}
}
done()
}
module.exports.DelimitedStream = DelimitedStream
function DelimitingStream() {
stream.Transform.call(this)
}
util.inherits(DelimitingStream, stream.Transform)
DelimitingStream.prototype._transform = function(chunk, encoding, done) {
var length = chunk.length
var lengthBytes = []
while (length > 0x7f) {
lengthBytes.push((1 << 7) + (length & 0x7f))
length >>= 7
}
lengthBytes.push(length)
this.push(new Buffer(lengthBytes))
this.push(chunk)
done()
}
module.exports.DelimitingStream = DelimitingStream
var sinon = require('sinon')
var chai = require('chai')
chai.use(require('sinon-chai'))
var expect = chai.expect
var ms = require('./messagestream')
describe('MessageStream', function() {
describe('DelimitedStream', function() {
it('should emit complete varint-delimited chunks', function() {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', spy)
ds.write(new Buffer([1, 0x61, 2, 0x62, 0x63]))
expect(spy).to.have.been.calledTwice
expect(spy.firstCall.args).to.eql([new Buffer([0x61])])
expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])])
})
it('should wait for more data', function() {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', spy)
ds.write(new Buffer([3]))
expect(spy).to.not.have.been.called
ds.write(new Buffer([0x66]))
expect(spy).to.not.have.been.called
ds.write(new Buffer([0x65]))
expect(spy).to.not.have.been.called
ds.write(new Buffer([0x64]))
expect(spy).to.have.been.calledOnce
expect(spy.firstCall.args).to.eql([new Buffer([0x66, 0x65, 0x64])])
})
it('should read varint32 properly', function() {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', spy)
ds.write(new Buffer([172, 2])) // 300
var data = new Buffer(300)
data.fill(0)
ds.write(data)
expect(spy).to.have.been.calledOnce
expect(spy.firstCall.args).to.eql([data])
})
it('should emit "end"', function(done) {
var ds = new ms.DelimitedStream()
var spy = sinon.spy()
ds.on('data', sinon.spy())
ds.on('end', spy)
ds.write(new Buffer([1]))
ds.end()
setImmediate(function() {
expect(spy).to.have.been.called
done()
})
})
})
describe('DelimitingStream', function() {
it('should add delimiter chunks to stream', function() {
var ds = new ms.DelimitingStream()
var spy = sinon.spy()
ds.on('data', spy)
ds.write(new Buffer([0x66, 0x6f, 0x6f]))
expect(spy).to.have.been.calledTwice
expect(spy.firstCall.args).to.eql([new Buffer([0x03])])
expect(spy.secondCall.args).to.eql([new Buffer([0x66, 0x6f, 0x6f])])
})
it('should write proper varints', function() {
var ds = new ms.DelimitingStream()
var spy = sinon.spy()
ds.on('data', spy)
var data = new Buffer(300)
data.fill(0)
ds.write(data)
expect(spy).to.have.been.calledTwice
expect(spy.firstCall.args).to.eql([new Buffer([172, 2])])
expect(spy.secondCall.args).to.eql([data])
})
it('should emit "end"', function(done) {
var ds = new ms.DelimitingStream()
var spy = sinon.spy()
ds.on('data', sinon.spy())
ds.on('end', spy)
ds.write(new Buffer([1]))
ds.end()
setImmediate(function() {
expect(spy).to.have.been.called
done()
})
})
})
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment