Instantly share code, notes, and snippets.

Embed
What would you like to do?
Composable Object Streams

Composable Object Streams

Rationale

Streams work together through two mechanisms:

  1. Their programmatic API, such as read(), write(), on('data'), etc.
  2. The format of the data being operated on.

Traditional streams operate on a simple stream of bytes. Ordering matters, but the start and end of individual Buffer objects does not.

In the new streams2 API coming in node 0.10, there are now 'object streams'. These are streams where objectMode: true is passed to the constructor. In this mode rebuffering is disabled and non-Buffer objects can now be streamed.

One problem with object mode is that suddenly its very difficult to build composable streams because the format of input messages can vary greatly.

This gist proposes a message format convention that might make it easier to build composable object streams.

This convention is based on my experimentation with network protocol streams. Perhaps it only makes sense in that domain. I am looking for feedback to see if it is more broadly useful or if there is a better way to approach the problem.

Message Format

The core concept of the format is that a message consists of some binary Buffer data and some meta-data. The meta-data may represent information to be written into the Buffer or may represent information that has been read out of the Buffer.

By convention the binary Buffer is represented by two properties:

  1. data: The Buffer being read from or written to.
  2. offset: The next location to read from or write to. Treated as 0 if missing. (This field is used to avoiding baking in the penalty of a slice() call on every message for each stream in the pipeline.)

Meta-data is contained in objects stored in additional properties on the message. The name of these properties is domain specific. For example, a stream performing ethernet frame serialization might use msg.ether for its meta-data.

These meta-data properties provide a couple benefits:

  1. Some namespace protection so that streams don't conflict with each other.
  2. A side-channel for communicating between streams that are aware of each other. For example, an IP header serialization stream might check the msg.ether.type value to make sure the message represents an IP packet.

An example message might look like the following:

var msg = {
  data: buf,
  offset: 14,
  ether: {
    src: '01:02:03:04:05:06',
    dst: '06:05:04:03:02:01',
    type: 'ip',
    length: 14
  }
};

Implementation

The object-transform module provides a base class for implementing a composable object stream using this approach. Simply extend the ObjectTransform base class and implement the _reduce() and _expand() functions.

'use strict';

module.exports = EtherStream;

var EtherFrame = require('ether-frame');
var ObjectTransform = require('object-transform');
var util = require('util');

util.inherits(EtherStream, ObjectTransform);

function EtherStream(opts) {
  var self = (this instanceof EtherStream)
           ? this
           : Object.create(EtherStream.prototype);

  opts = opts || {};

  opts.meta = 'ether';

  ObjectTransform.call(self, opts);

  if (self.ether && typeof self.ether.toBuffer !== 'function') {
    throw new Error('Optional ether value must be null or provide ' +
                    'toBuffer() function');
  }

  return self;
}

EtherStream.prototype._reduce = function(msg, output, callback) {
  msg.ether = new EtherFrame(msg.data, msg.offset);
  msg.offset += msg.ether.length;
  return msg;
};

EtherStream.prototype._expand = function(ether, msg, output, callback) {
  ether.toBuffer(msg.data, msg.offset);
  msg.ether = ether;
  msg.offset += ether.length;
  return msg;
};

Note, the terms _reduce() and _expand() are used instead of _read() and _write() to avoid conflicting with the existing streams2 base API. I like to read them as "reducing content in the Buffer" or "expanding content in the Buffer".

Possible Uses

This stream could then be used with other composable streams in like this:

var input = new EtherStream();
var output = input.pipe(new IpStream())
                   .pipe(new UdpStream())
                   .pipe(new DnsStream());    // DnsStream doesn't exist yet

input.write(rawBuffer);
var msg = output.read();

if (msg.ether.src === expectedMAC && msg.dns.type === 'query') {
  // allow query and respond appropriately
}

Or you could put them together as packet monitoring tool:

// Not sure this provides a streams2 API at the moment
var rawSocket = require('raw-socket');

// CallbackStream doesn't exist yet
var monitor = new CallbackStream(function(msg) {
  console.log(msg);
});

rawSocket.pipe(new EtherStream())     // read packet headers
         .pipe(new IpStream())
         .pipe(new UdpStream())
         .pipe(monitor)               // log packet
         .pipe(new UdpStream())       // write packet headers
         .pipe(new IpStream())
         .pipe(new EtherStream())
         .pipe(new BufferizeStream()) // convert back to flat Buffer, does not exist yet
         .pipe(rawSocket);

Note, not all the components are there yet for these examples to work, but it shows the end goal.

Feedback and Next Steps

Is this useful? Is there a better way? Am I completely confused?

Please comment here or send me a message on twitter if you have an opinion.

Also, if you try implementing a stream using this pattern, consider adding the composable-object-stream keyword to your package.json. That way it will be easy to find other streams that can be combined:

Streams using this pattern.

Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment