Skip to content

Instantly share code, notes, and snippets.

@bodokaiser
Last active December 15, 2015 17:59
Show Gist options
  • Save bodokaiser/5300586 to your computer and use it in GitHub Desktop.
Save bodokaiser/5300586 to your computer and use it in GitHub Desktop.
This stream example will read incoming bytes as two-byte package. So it should cache buffers until they have a length of two and it should recall itself to handle buffers which hold multiple packages. The first target of caching bytes until there are two works great but the second target where it should reread cached buffers which contain more t…
var util = require('util');
var stream = require('stream');
var parser = new stream.Readable();
var source = new stream.Readable();
parser.cache = [];
parser.source = source;
parser._read = function() {
var chunk = source.read();
if (chunk === null)
return this.push('');
// add bytes of chunk to our cache
var offset = this.cache.length;
for (var i = 0; i < chunk.length; i++)
this.cache[offset + i] = chunk[i];
// if we have something in the cache
// create a buffer from it and overwrite chunk
if (this.cache.length)
chunk = new Buffer(this.cache);
// if chunk is bigger than one we have
// can slice a two byte package from it
// else we have to wait for more bytes
if (chunk.length > 1) {
// the body is two byte big
var body = chunk.slice(0, 2);
// everything above must be another one
var remnant = chunk.slice(2);
// push the body to read queue
this.push(body);
// reset cache
this.cache = [];
// if there is a remaining part
// push it to the cache and call
// this.push('') to keep reading
// (which actually does not work)
if (remnant.length) {
this.cache.push(remnant);
this.push('');
}
} else {
this.push('');
}
};
source._read = function() {};
source.on('readable', function() {
parser.read(0);
});
parser.on('readable', function() {
console.log('reading', parser.read());
});
// can handle singe bytes
source.push(new Buffer([0x01]));
source.push(new Buffer([0x02]));
source.push(new Buffer([0x03]));
source.push(new Buffer([0x04]));
// cannot handle chunked bytes
// parser will stop reading after 0x05, 0x06
// so that 0x07, 0x08 will not get handled
source.push(new Buffer([0x05, 0x06, 0x07, 0x08]));
@arhart
Copy link

arhart commented Apr 4, 2013

You may want to consider:

  1. chunk === null doesn't imply the cache is empty.
  2. new Buffer(array) expects an array of octets, but this code passes a combination of buffers and octets.
  3. Buffer.concat(array) takes a possibly empty array of buffers, so you could just push the new chunk (if there is one) directly onto the cache and combine them without iterating over the new chunk.

@isaacs
Copy link

isaacs commented Apr 5, 2013

It'd be better to use a Transform stream for this, and pipe the source into it. That's what Transform is for.

I don't get what this is actually supposed to be doing, though... So you are just getting chunks, and passing them through in 2-byte increments? Why not just push the whole thing? Or at least manipulate it in some way? In what sense is this a "parser"?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment