Skip to content

Instantly share code, notes, and snippets.

@creationix
Last active December 27, 2015 04:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creationix/7270104 to your computer and use it in GitHub Desktop.
Save creationix/7270104 to your computer and use it in GitHub Desktop.
Streams proposals

Terminology

Promise/Continuable/Callback

I'll use the word "promise" to refer to some async action that will happen once at a later date. It's a non-blocking way to wait for a value. It will eventually resolve with either an error or a value.

Don't get caught up in terminology. If you prefer continuables over node-style callbacks, pretend in your mind that all code samples are using them. If you prefer callback, imagine that.

// Node style callback syntax
fs.readFile("myfile.txt", "utf8", function (err, text) {
  if (err) return handleError(err);
  console.log("here is the data", text);
});

// new Continuable style syntax
fs.readFile("myfile.txt", "utf8")(function (err, text) {
  if (err) return handleError(err);
  console.log("here is the data", text);
});

// old Continuable style syntax
fs.readFile("myfile.txt", "utf8")(function (text) {
  console.log("here is the data", text);
}, handleError);

// Promise style syntax
fs.readFile("myfile.txt", "utf8").then(function (text) {
  console.log("here is teh data", text);
}, handleError);

Event handler/listener

Events are a way to handle events that can happen zero or more times. You will need a way to clean up these listeners/handlers and a way to change the listener on the fly.

Again don't get caught up on the concrete syntax used. These are all the same thing from an abstract point of view. The exact implementation can be chosen at a later date after the basic abstract ideas are decided upon.

// Node style event listener
server.on("request", function (req, res) { ... });

// Dom style
window.addEventListener("load", function (evt) { ...}, false);

// direct function style
remote.onProgress = function (progress) { ... };

Goals

Low Level

All stream interface proposals should have the following low-level capabilities:

  • An async stream of data events. These can be data chunks (pieces of a file download as string or binary data) or discreet object values.
  • A way to notify consumers that there will be no more data (end).
  • A way for the consumer to notify to the source that it doesn't want any more data (abort).
  • A sane way to report errors.
  • errors in an underlying connection (like a socket disconnect)
  • errors because of invalid input/data (bad data in a protocol stream, invalid utf8 data in a test stream, etc...)
  • programming errors (general exceptions in user code or libraries used during the stream processing)
  • etc..

High Level

Also thought should be given to higher-level interfaces that allow treating streams as single-value primitives. These come in a few classes of functions.

  • Sources. These are things that accept configuration parameters and return a readable stream.
    • source(params) -> readable
  • Transforms. These are things that accept a stream and return a new stream with transformed data.
    • transform(readable) -> readable
  • Sinks/Destinations. These are things that consume streams. This could be the writable end of a TCP socket for example.
    • sink(readable) -> promise<done/error>
    • writable.sink(readable) -> promise<done/error>
  • Pipe. This is something that connects a readable stream to a writable stream.
    • pipe(readable, writable) -> promise<done/error>
    • readable.pipe(writable) -> promise<done/error>

Flow Streams

Flow streams are streams optimized for the case where you want to passivly stream data. You aren't interested in the individual events, but rather just want to move data from point A to point B. You react to control-flow events to handle back-pressure.

Readable Flow

These should start paused so that no data events are lost in case the consumer isn't ready for them yet.

  • "data" event for each item in the stream (binary/text or object)
  • "end" event for when the stream is done (could be a special "data" event if there is a special value free like null or undefined)
  • .pause() method called by consumer when it's not ready for more events. No events will be emitted after this call.
  • .resume() method called by consumer when it's ready for more.
  • .abort() method called by consumer to notify source it won't want any more event ever. (could be special case of "pause")
  • "error" event for when the source stream has a problem. (could be special end event)

Writable Flow

  • .write(data) method called by source to send a data event to consumer.
  • .end() method called by source to tell consumer there won't be any more data.
  • "pause" event to tell the writer it should pause writing data. (could be return value to write)
  • "resume" event to tell writer it's ready for more data.
  • "abort" event to tell writer the writable doesn't want more data ever.
  • "error" event to tell writer that there was a problem somewhere.

Active Streams

These are streams where every data chunk is activly read and written with it's own async function call. There is no need for back-pressure APIs because this system naturally waits for data to be read or written before moving on.

Readable

  • .read() -> promise<data/end/error> - Ask for the next event. Will get either data, end, or error result.
  • .abort() - Tell the source you won't read again, ever.

Writable

  • .write(data/end) -> promise<written/error> - Write a chunk to the stream, result when written or there was an error
  • "abort" - Event that the writable won't be wanting more data ever.

New Hybrid Streams

After using styles like these two in js-git and node over the pase several years, I'm working on a new proposal that's a hybrid.

Readable

The stream starts paused. If there is an "end" event (for either EOS or error) it will be the last event ever.

  • "data" event contains data. If you return falsy, the source will auto-pause. Otherwise it will continue flowing.
  • "end" event. Will have (err) param is there was an error
  • .resume() - method to tell the source you're ready for the next data/end event. It will continue flowing till abort or ondata returns falsy.
  • .abort() method to tell the source you won't be wanting any more ever.
// Syntax is simple events, but see section at top of page for other options.
readable.ondata = function (data) {
  // Handle data
  // Return true telling the source we still want data.
  return true;
};
readable.onend = function (err) {
 // continuation logic goes here
}
// Start the stream.
readable.resume();

Another syntax for adding the event listeners could be a setup function that returns the instance for easy chaining:

readable.setup(function (data) {
  // handle data
  return true;
}, function (err) {
  if (err) return errorHandler(err);
  // handle end
}).resume();

A simple transform that takes string input and emits codepoints as numbers as output stream. (uses the first event syntax)

function split(input) {
  // Start the new stream paused.
  var paused = true;
  // queue for output events
  var queue = [];
  // New output stream.  Event listeners will be put on this.
  var out = {
    abort: input.abort,  // Direct reference passing
    resume: resume
  };
  return out;

  function resume() {
    paused = false;
    check();
  }
  
  function check() {
    // flush pending data to output
    while (!paused && queue.length) {
      out.ondata(queue.shift());
      
    if (paused) return
    
  
  function setup(onData, onEnd) {
    input.setup(function (string) {
      for (var i = 0, l = string.length; i < l; i++) {
        queue.push(string.charCodeAt(i));
      }
      check();
    }
  }
}

Abstract

Abstractly, a stream is a two-way interface. The producer sends 0 or more data values over time to the consumer after which it notifies the consumer there will be no more data (with an optional reason such as disconnect error). The other direction is the consumer telling the producer information about when the producer needs to pause and when it's ready for more data. Also the consumer will tell the producer when it's no-longer interested in any data and will be no-longer be sending control-flow signals. This abort message can optionally have a reason (such as disconnect error).

P         C
 --DATA-->
\--END--->\
/--ERROR->/
/<-PAUSE--/
\<-RESUME-\
/<-ABORT--/
\<-ERROR--\

In a tweet-sized ASCII diagram, that's really all a stream is. It's a finite or infinite data stream through time one direction with a control-flow signal through time the other direction.

Minimalist

Condensing this down to a minimalist interface, we have the following:

// The null properties are to be filled in by consumer with handler functions.
var readable = {
  onwrite: null,
  onend: null,
  flow: function (isFlowing) { ... },
  abort: function (err) { ... }
};

// The null properties are to be filled in by producer with handler functions.
var writable = {
  write: function (item) { ... },
  end: function (err) { ... },
  onflow: null,
  onabort: null,
};

Setting these on* handlers manually is ugly and error-prone. Also most streams can't start executing till the handlers are setup. So for ease of use, let's add some setup functions to both sides and hide the handlers.

function makeReadable() {
  // Will be references to handler functions in consumer.
  var write, end;
  // Internal flags for flow-control
  var isFlowing = false, isDone = false;

  return { produce: produce, flow: flow, abort: abort };
  
  function produce(onWrite, onEnd) {
    write = onWrite;
    end = onEnd;
    isFlowing = true;
    // TODO: Start doing stuff...
  }
  
  function flow(flowing) {
    isFlowing = flowing;
    // TODO: Check state
  }
  
  function abort(err) {
    isDone = true;
    // TODO: cleanup resources
    if (err) handleError(err);
  }
}

function makeWritable() {
  var flow, abort;
  var flowing = true, isDone = false;
  
  return { consume: consume, write: write, end: end };
  
  function consume(onFlow, onAbort) {
    flow = onFlow;
    abort = onAbort;
  }
  
  function write(item) {
    // TODO: write value somewhere.
    // Tell producer to speed up or slow down if needed
    if (tooFast === flowing) {
      flowing = !flowing;
      onFlow(flowing);
    }
  }
  
  function end(err) {
    isDone = true;
    // clean up resources
  }
}

Since the API surfaces don't overlap, a duplex stream can be easily created by simply implementing both interfaces at once.

A pipe helper that connects a readable interface to a writable interface can be as simple as:

function pipe(readable, writable) {
  writable.consume(readable.flow, readable.abort);
  readable.produce(writable.write, writable.end);
}

Constructor style Syntax

First some helper functions that act like parent classes.

function WritableSetup(self, readable) {
}

function WritableWrite(self) {
}

function WritableEnd(self, err) {
}

Here is an example TCP duplex stream wrapping https://developer.mozilla.org/en-US/docs/WebAPI/TCP_Socket

// Given a TCPSocket instance, wrap as a stream.
function TcpStream(socket) {
  // placeholder for the stream that will read from this.
  this.dest = null;

  // placeholder for the stream that will write to this.
  this.source = null;
  
  // Flag to make sure we don't end more than once.
  this.done = false;

  var self = this;
  socket.ondata = function (evt) {
    // Forward data to the destination stream.
    self.dest.write(evt.data);
  };
  socket.ondrain = function () {
   // Tell the source it can write to us again.
    self.source.flow(true);
  };
  socket.onerror = function (evt) {
    if (self.done) return;
    self.done = true;
    // Tell both stream interfaces about the problem.
    var error = new Error(evt.data);
    self.source.abort(error);
    self.dest.end(error);
  };
  socket.onclose = function () {
    if (self.done) return;
    self.done = true;
    // Tell both sides the stream is done
    self.source.abort();
    self.dest.end();
  };
}

// Define the readable prototype methods

TCPStream.prototype.pipe = function (dest) {
  this.dest = dest;
  dest.setup(this);
};
TCPStream.prototype.flow = function (flowing) {
  if (flowing) {
    this.socket.resume();
  }
  else {
    this.socket.suspend();
  }
};
TCPStream.prototype.abort = function (err) {
  if (this.done) return;
  this.aborted = err || true;
  // TODO: Implement
};

// Define the writable prototype methods

TCPStream.prototype.setup = function (source) {
  this.source = readable;
};
TCPStream.prototype.write = function (data) {
  // Send the data to the socket.
  if (this.socket.send(data)) {
    // If the socket is full, tell our source to slow things down.
    this.source.flow(false);
  }
};
TCPStream.prototype.end = function (err) {
  if (this.done) return;
  this.ended = err || true;
  // TODO: Implement
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment