Streams work together through two mechanisms:
- Their programmatic API, such as
read()
,write()
,on('data')
, etc. - The format of the data being operated on.
Traditional streams operate on a simple stream of bytes. Ordering matters,
but the start and end of individual Buffer
objects does not.
In the new streams2 API coming in node 0.10, there are now 'object streams'.
These are streams where objectMode: true
is passed to the constructor. In
this mode rebuffering is disabled and non-Buffer
objects can now be streamed.
One problem with object mode is that suddenly its very difficult to build composable streams because the format of input messages can vary greatly.
This gist proposes a message format convention that might make it easier to build composable object streams.
This convention is based on my experimentation with network protocol streams. Perhaps it only makes sense in that domain. I am looking for feedback to see if it is more broadly useful or if there is a better way to approach the problem.
The core concept of the format is that a message consists of some binary
Buffer
data and some meta-data. The meta-data may represent information
to be written into the Buffer
or may represent information that has been
read out of the Buffer
.
By convention the binary Buffer
is represented by two properties:
data
: TheBuffer
being read from or written to.offset
: The next location to read from or write to. Treated as 0 if missing. (This field is used to avoiding baking in the penalty of aslice()
call on every message for each stream in the pipeline.)
Meta-data is contained in objects stored in additional properties on the
message. The name of these properties is domain specific. For example,
a stream performing ethernet frame serialization might use msg.ether
for
its meta-data.
These meta-data properties provide a couple benefits:
- Some namespace protection so that streams don't conflict with each other.
- A side-channel for communicating between streams that are aware of each
other. For example, an IP header serialization stream might check the
msg.ether.type
value to make sure the message represents an IP packet.
An example message might look like the following:
var msg = {
data: buf,
offset: 14,
ether: {
src: '01:02:03:04:05:06',
dst: '06:05:04:03:02:01',
type: 'ip',
length: 14
}
};
The object-transform module provides a base class for implementing a
composable object stream using this approach. Simply extend the
ObjectTransform
base class and implement the _reduce()
and _expand()
functions.
'use strict';
module.exports = EtherStream;
var EtherFrame = require('ether-frame');
var ObjectTransform = require('object-transform');
var util = require('util');
util.inherits(EtherStream, ObjectTransform);
function EtherStream(opts) {
var self = (this instanceof EtherStream)
? this
: Object.create(EtherStream.prototype);
opts = opts || {};
opts.meta = 'ether';
ObjectTransform.call(self, opts);
if (self.ether && typeof self.ether.toBuffer !== 'function') {
throw new Error('Optional ether value must be null or provide ' +
'toBuffer() function');
}
return self;
}
EtherStream.prototype._reduce = function(msg, output, callback) {
msg.ether = new EtherFrame(msg.data, msg.offset);
msg.offset += msg.ether.length;
return msg;
};
EtherStream.prototype._expand = function(ether, msg, output, callback) {
ether.toBuffer(msg.data, msg.offset);
msg.ether = ether;
msg.offset += ether.length;
return msg;
};
Note, the terms _reduce()
and _expand()
are used instead of _read()
and _write()
to avoid conflicting with the existing streams2 base API.
I like to read them as "reducing content in the Buffer
" or "expanding
content in the Buffer
".
This stream could then be used with other composable streams in like this:
var input = new EtherStream();
var output = input.pipe(new IpStream())
.pipe(new UdpStream())
.pipe(new DnsStream()); // DnsStream doesn't exist yet
input.write(rawBuffer);
var msg = output.read();
if (msg.ether.src === expectedMAC && msg.dns.type === 'query') {
// allow query and respond appropriately
}
Or you could put them together as packet monitoring tool:
// Not sure this provides a streams2 API at the moment
var rawSocket = require('raw-socket');
// CallbackStream doesn't exist yet
var monitor = new CallbackStream(function(msg) {
console.log(msg);
});
rawSocket.pipe(new EtherStream()) // read packet headers
.pipe(new IpStream())
.pipe(new UdpStream())
.pipe(monitor) // log packet
.pipe(new UdpStream()) // write packet headers
.pipe(new IpStream())
.pipe(new EtherStream())
.pipe(new BufferizeStream()) // convert back to flat Buffer, does not exist yet
.pipe(rawSocket);
Note, not all the components are there yet for these examples to work, but it shows the end goal.
Is this useful? Is there a better way? Am I completely confused?
Please comment here or send me a message on twitter if you have an opinion.
Also, if you try implementing a stream using this pattern, consider adding
the composable-object-stream
keyword to your package.json
. That way
it will be easy to find other streams that can be combined:
Thank you!