Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Min Streams Interface Spec

Min Streams Interface Spec

This spec describes a minimal stream interface meant for protocol implementors. Modules written to the interfaces in this spec can be used by a wide variety of projects. The protocols will not need any extra dependencies themselves. It's just an interface to implement.

Simple Stream

A simple stream is just a function. It represents a pull-stream. It has the following signature:

// Implementing a stream.
function read(abort, callback) {
  // abort is a flag for signaling to the source that you're done with the stream.
  // if abort is truthy, and you're a filter, you usually want to pass it on the next time you call the source read.
  // if youe're a source, then stop reading and cleanup.  Call the callback with an end event when done.
    
  // To encode an item, have a falsy value for the first param and the item for the second.
  callback(null, item);
  
  // To encode the end of the stream, use the special `undefined` value for item.
  callback(null, undefined);
  // or simply
  callback();
  
  // If you wish to emit an error, pass a truthy value to the first parameter to the callback;
  callback(new Error("Oops, the monkey got out and ate all the banannas."));
});

// Consuming a stream
read(null, function (err, data) {
  // I got data!
});

The callback is allowed to be called before the read function returns (sync callback like in Array.prototype.forEach), but it won't always be sync because it might depend on some non-blocking I/O source. If you're doing any recursive loops over data, make sure to take this into account and not blow your stack.

Note: Dominic Tarr's pull-streams have a slightly different encoding for end events. Use an adapter for interop.

Filters

A protocol that translates between two stream formats is called a filter. The input and output events are a many-to-many relationship. Every input event may output zero or more output events.

Pull Filter

The easiest to understand filter is a pull filter. Modules implementing this spec export a function that accepts a read function and return a new read function.

// Implementing a pull filter
// input: strings
// output: uppercase string
function toupper(read) {
  return function (abort, callback) {
    // Forward abort through, we don't need to mess with it.
    read(abort, function (err, data) {
      // Forward errors and end of stream events throguh as well.
      if (data === undefined) {
        return callback(err);
      }
      callback(null, data.toUpperCase());
    });
  }
}

// Using the filter
var words = createWords(); // An imaginary stream source that emits strings
var uppercaseWords = toupper(words);
// Now consume the new stream instead and it will be the uppercase protocol.

Push Filter

The easiest to implement filter, especially when the protocol events are many-to-many is the push filter. With this interface you are freed from worrying about abort messages and back-pressure. Just describe the core protocol logic and generic helper functions can later convert this to a proper pull filter with properback-pressure and abort forwarding.

// Creating a push filter
// input: byte arrays
// output: individual bytes
function splitBytes(emit) {
  return function (err, item) {
    // Forward errors and END events through.
    if (item === undefined) {
      return emit(err);
    }
    // Emit the items one at a time;
    for (var i = 0, l = item.length; i < l; i++) {
      emit(null, item[i]);
    }
  }
}

If your module is best described using a push filter, don't depend on some conversion library and export a pull filter. Simply export this function and document that it implements the push filter interface. Then people using your library can decide if they want to wrap it.

If several push-filters are chained in a row, the functions are directly composable without being first converted to pull filters. This removes several layers of complexity, buffering and keeps things fast.

Sources and Sinks

A source is a read stream that wraps some event source such as a file stream, a tcp socket, or even user input in a gui application. The interface is identical to all other streams.

A sink is a wrapping around some writable stream. Instead of exposing a write function, it is a consume function that accepts a readable stream.

The main concept here is that there are no writable streams or duplex streams. A duplex stream like a tcp socket is represented by a source and sink pair.

For example, a tcp server that accepts connections encoded with message framing and msgpack serialization would look like:

TCP Source -> Message DeFramer -> Msgpack Decoder -> APP -> Msgpack Encoder -> Message Framer -> TCP Sink

The code to wire this up may look like:

var frameFilter = require('frame-filter');
// frameFilter.decode - push filter (byte chunks -> messages)
// frameFilter.encode - push filter (messages -> byte chunks)

var msgpack = require('msgpack-js');
// msgpack.decode = map function [buffer -> object]
// msgpack.encode = map function [object -> buffer]

// Pull in a couple generic helpers for converting between the formats.
var pushToPull = require('push-to-pull-adapter');
// pushToPull adapter function (push-filter -> pull-filter)
var mapToPush = require('map-to-push-adapter');
// mapToPush adapter function (map function -> push-filter)

// Since push-filters are composable, we only need to convert the map function for msgpack into push filters and chain them all.
// We could have as easily used pull-filters since the events are 1:1, but all our other filters are push style so this fits better.
var msgpackEncode = mapToPush(msgpack.encode);
var msgpackDecode = mapToPush(msgpack.decode);

// This app gets object events and emits new object events
// It implement some application level protocol on top
// of the msgpack and framing protocols.
var app = require('msgpack-app');
// app - push filter (object -> object)

// The entire pull stream looks like:
// sink <- frame <- encode <- app <- decode <- deframe <- source
// First lets combine all our push filters into a single push-filter.
// pushFilter = frame <- encode <- app <- decode <- deframe
var pushFilter = frameFilter.encode(msgpackEncode(app(msgpackDecode(frameFilter.decode))));
// source and sink are pull-style so we need to adapt the combined pushFilter and stick it in the middle.
var pullFilter = pushToPull(pushFilter);

// Our module is an onConnect handler that expects a source and a sink representing the duplex socket
module.exports = function (source, sink) {
  // When a new stream is found, we just plug it up.
  // The sink starts the whole process when it's handed a stream.
  sink(pullFilter(source));
};

For maximum reuse, we could have exported pushFilter directly and in another module for the TCP handler or something consumed it.

@creationix

This comment has been minimized.

Copy link
Owner Author

creationix commented May 2, 2013

@mikermcneil

This comment has been minimized.

Copy link

mikermcneil commented May 10, 2014

thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.