Last active
September 2, 2019 07:54
-
-
Save sorccu/63cdb5cb55df46e26a17 to your computer and use it in GitHub Desktop.
Delimited ProtoBuf.js streams.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var util = require('util') | |
var stream = require('stream') | |
function DelimitedStream() { | |
stream.Transform.call(this) | |
this._length = 0 | |
this._lengthIndex = 0 | |
this._readingLength = true | |
this._buffer = new Buffer(0) | |
} | |
util.inherits(DelimitedStream, stream.Transform) | |
DelimitedStream.prototype._transform = function(chunk, encoding, done) { | |
this._buffer = Buffer.concat([this._buffer, chunk]) | |
while (this._buffer.length) { | |
if (this._readingLength) { | |
var byte = this._buffer[0] | |
this._length += (byte & 0x7f) << (7 * this._lengthIndex) | |
if (byte & (1 << 7)) { | |
this._lengthIndex += 1 | |
this._readingLength = true | |
} | |
else { | |
this._lengthIndex = 0 | |
this._readingLength = false | |
} | |
this._buffer = this._buffer.slice(1) | |
} | |
else { | |
if (this._length <= this._buffer.length) { | |
this.push(this._buffer.slice(0, this._length)) | |
this._buffer = this._buffer.slice(this._length) | |
this._length = 0 | |
this._readingLength = true | |
} | |
else { | |
// Wait for more chunks | |
break | |
} | |
} | |
} | |
done() | |
} | |
module.exports.DelimitedStream = DelimitedStream | |
function DelimitingStream() { | |
stream.Transform.call(this) | |
} | |
util.inherits(DelimitingStream, stream.Transform) | |
DelimitingStream.prototype._transform = function(chunk, encoding, done) { | |
var length = chunk.length | |
var lengthBytes = [] | |
while (length > 0x7f) { | |
lengthBytes.push((1 << 7) + (length & 0x7f)) | |
length >>= 7 | |
} | |
lengthBytes.push(length) | |
this.push(new Buffer(lengthBytes)) | |
this.push(chunk) | |
done() | |
} | |
module.exports.DelimitingStream = DelimitingStream |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var sinon = require('sinon') | |
var chai = require('chai') | |
chai.use(require('sinon-chai')) | |
var expect = chai.expect | |
var ms = require('./messagestream') | |
describe('MessageStream', function() { | |
describe('DelimitedStream', function() { | |
it('should emit complete varint-delimited chunks', function() { | |
var ds = new ms.DelimitedStream() | |
var spy = sinon.spy() | |
ds.on('data', spy) | |
ds.write(new Buffer([1, 0x61, 2, 0x62, 0x63])) | |
expect(spy).to.have.been.calledTwice | |
expect(spy.firstCall.args).to.eql([new Buffer([0x61])]) | |
expect(spy.secondCall.args).to.eql([new Buffer([0x62, 0x63])]) | |
}) | |
it('should wait for more data', function() { | |
var ds = new ms.DelimitedStream() | |
var spy = sinon.spy() | |
ds.on('data', spy) | |
ds.write(new Buffer([3])) | |
expect(spy).to.not.have.been.called | |
ds.write(new Buffer([0x66])) | |
expect(spy).to.not.have.been.called | |
ds.write(new Buffer([0x65])) | |
expect(spy).to.not.have.been.called | |
ds.write(new Buffer([0x64])) | |
expect(spy).to.have.been.calledOnce | |
expect(spy.firstCall.args).to.eql([new Buffer([0x66, 0x65, 0x64])]) | |
}) | |
it('should read varint32 properly', function() { | |
var ds = new ms.DelimitedStream() | |
var spy = sinon.spy() | |
ds.on('data', spy) | |
ds.write(new Buffer([172, 2])) // 300 | |
var data = new Buffer(300) | |
data.fill(0) | |
ds.write(data) | |
expect(spy).to.have.been.calledOnce | |
expect(spy.firstCall.args).to.eql([data]) | |
}) | |
it('should emit "end"', function(done) { | |
var ds = new ms.DelimitedStream() | |
var spy = sinon.spy() | |
ds.on('data', sinon.spy()) | |
ds.on('end', spy) | |
ds.write(new Buffer([1])) | |
ds.end() | |
setImmediate(function() { | |
expect(spy).to.have.been.called | |
done() | |
}) | |
}) | |
}) | |
describe('DelimitingStream', function() { | |
it('should add delimiter chunks to stream', function() { | |
var ds = new ms.DelimitingStream() | |
var spy = sinon.spy() | |
ds.on('data', spy) | |
ds.write(new Buffer([0x66, 0x6f, 0x6f])) | |
expect(spy).to.have.been.calledTwice | |
expect(spy.firstCall.args).to.eql([new Buffer([0x03])]) | |
expect(spy.secondCall.args).to.eql([new Buffer([0x66, 0x6f, 0x6f])]) | |
}) | |
it('should write proper varints', function() { | |
var ds = new ms.DelimitingStream() | |
var spy = sinon.spy() | |
ds.on('data', spy) | |
var data = new Buffer(300) | |
data.fill(0) | |
ds.write(data) | |
expect(spy).to.have.been.calledTwice | |
expect(spy.firstCall.args).to.eql([new Buffer([172, 2])]) | |
expect(spy.secondCall.args).to.eql([data]) | |
}) | |
it('should emit "end"', function(done) { | |
var ds = new ms.DelimitingStream() | |
var spy = sinon.spy() | |
ds.on('data', sinon.spy()) | |
ds.on('end', spy) | |
ds.write(new Buffer([1])) | |
ds.end() | |
setImmediate(function() { | |
expect(spy).to.have.been.called | |
done() | |
}) | |
}) | |
}) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment