Skip to content

Instantly share code, notes, and snippets.

@domenic
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save domenic/197ae6fab8adc6890c7a to your computer and use it in GitHub Desktop.
Save domenic/197ae6fab8adc6890c7a to your computer and use it in GitHub Desktop.
Streams API Potential Simplifications: Investigative Report

Streams API Potential Simplifications: Investigative Report

Part 1: Conslidated Write

This proposed change is semantic.

Background

Right now, we have two asynchronous signals given off by our WritableStreams:

  • wait()'s return value fulfills to let you know when the stream is ready to accept more data (i.e., there is room in its internal queue)
  • write(chunk)'s return value fulfills to let you know when chunk has been flushed to the underlying source

Most "reactive" stream or observable abstractions only include support for the former signal. That is, their interface is:

var p = writableStream.write(chunk);
// `p` fulfills when `writableStream` is ready to accept more data

So for a writable stream with a reasonably-sized internal queue, the first few promises returned will immediately fulfill, despite the underlying sink not having seen the data yet.

Changes to WritableStream

  • Combine asynchronous wait() and asynchronous write() into asynchronous write() whose returned promise behaves the same as that of wait().
  • The state property is no longer present.

Discussion

This is somewhat justifiable, as you can argue that from a consumer's perspective, once the chunk is "accepted" by the stream, the consumer's job is done. But it shows a pretty big mismatch between streams-in-the-abstract and streams-wrapping-sources.

As such we would still want to support some way of knowing whether a write to the underlying source has failed. This would be done through the closed promise (also returned by the close() method). Thus, to know if writes made their way to the underlying source, you would do:

// Ignore return values since they don't tell us about the underlying source
writableStream.write(chunk1);
writableStream.write(chunk2);
writableStream.write(chunk3);
writableStream.close().then(
  () => console.log("Successfully closed"),
  e => console.error("One of: starting the stream; writing chunk{1,2, or 3}, or closing the stream failed", e)
);

This is notably less convenient for those who wish to manually communicate data to, e.g., a socket; there is no real way of knowing whether the socket accepted the data, or if it was queued for later consumption. (We will see an echo of this point in part 3; stay tuned.)

Note also that the return value of write() being "ready to accept more data" is not terribly useful to any consumer except the pipe algorithm. (Just like wait() in the current design is not terribly useful either.) Knowing when the stream is ready to accept more data matters only when you want to communicate backpressure to the source of your data, which will generally only happen when you are piping.

With these two points combined, I do not really like this idea. It is conceptually nice to use a promise return value for this signal, but in terms of actual user-facing ergonomics, it is harmful.

Part 2: Async Read

This proposed change is largely cosmetic. It does not impact the semantics of the API, and indeed can be built on top of the existing API.

Changes to ReadableStream

  • Combine asynchronous wait() plus synchronous read() into promise-returning asynchronous read().
  • If the stream becomes closed, the promise returned by read() fulfills with ReadableStream.EOS ("end of stream")
  • The state property is no longer present.

Examples

Consumption code becomes:

// ES6
function readableStreamToArray(readable) {
  var chunks = [];

  return pump();

  function pump() {
    return readable.read().then(chunk => {
      if (chunk === ReadableStream.EOS) {
        return chunks;
      }
      chunks.push(chunk);
      return pump();
    });
  }
}

// ES7
async function readableStreamToArray(readable) {
  const chunks = [];

  while ((const chunk = await readable.read()) !== ReadableStream.EOS) {
    chunks.push(chunk);
  }
  return chunks;
}

Creation code stays the same.

Variations Considered

The use of EOS is slightly strange. Our current design avoids it via the state property. Why can't we do that here?

It turns out, having written a series of tests for this style, that there are many cases when you don't know ahead of time when calling read() whether the stream will be closed or not. If the stream does become closed, then there is no data left to give. In that case, a state-based design would have to either reject or return a sentinel value. The former forces your code into wierd contortions, as this non-exceptional case ends up being handled via rejections (or exceptions with async function syntax). And the latter brings us right back to EOS. In neither case does providing a state property, and telling the user to consult it ahead of time, help.

An alternate design is to use ECMAScript-iterator style { value, done } objects. However, without syntax support, this is fairly annoying to deal with, e.g.:

// ES7
async function readableStreamToArray(readable) {
  const chunks = [];

  while (!(const iterResult = await readable.read()).done) {
    chunks.push(iterResult.value);
  }
  return chunks;
}

Nevertheless { value, done } is a reasonable alternative to EOS. It has the theoretical advantage that you can then put any ECMAScript value in your stream, whereas with EOS there is a single value you cannot place in the stream (namely, ReadableStream.EOS itself.)

Part 3: Streams as Two-Sided Pipes

An attractive design for creating streams is to always create them as { readable, writable } pairs. These paired streams would share a queue, such that writing to the writable side then allows you to read that same value from the readable side, in FIFO order.

Indeed, you can see echoes of this design in the constructor signature of our existing streams, combined with the above asyncification changes. Consider:

var rs = new ReadableStream({
  start(enqueue, close, error) {},
  pull(enqueue, close, error) {},
  cancel(reason) {}
});

rs.read();
rs.cancel(reason);

var ws = new WritableStream({
  start(error) {},
  write(chunk, done, error) {},
  close() {},
  abort(reason) {}
});

ws.write();
ws.close();
ws.abort(reason);

In one direction, the (enqueue, close, error) signature of start and pull look very similar to the signature of a WritableStream with its write, close, abort methods. It is almost as if the readable stream's creator is writing to some kind of internal writable stream.

In the other direction, you could combine the start and write signatures into start(read, cancel) or similar: i.e., instead of getting each chunk that is written to it forwarded to the write function directly, it could read them using an async function, from an internal queue.

But of course, you cannot say that ReadableStream is constructed with an internal WritableStream, which in turn is constructed with an internal ReadableStream, ad infinitum. The solution is to have each of them be constructed via a third primitive, the queue. So:

var queue = new Queue();
var rs = new ReadableStream(queue);
var ws = new WritableStream(queue);

// returns promise for when the queue is ready for more
queue.enqueue(chunk);

// returns promise for a chunk or for Queue.EMPTY
// alternately uses { value, done } design
queue.dequeue();

// flushes queue; no more enqueues allowed;
// further calls to dequeue will return finalPromise
queue.close(finalPromise);

The streams and the queue are then connected like so:

rs.read(); // calls queue.dequeue()
rs.cancel(er); // calls queue.close(Promise.reject(er));

ws.write(chunk); // calls queue.enqueue(chunk)
ws.close(); // calls queue.close(Promise.resolve())
ws.abort(er); // calls queue.close(Promise.reject(er))

That is, in this design, the stream interfaces essentially just take on the job of exposing and adapting a subset of the queue interface.

Note how cancel has become more destructive than it was previously; it is now the counterpart to abort, signaling a real error, instead of just loss of interest. This is necessary since it needs to reflect on the other, writable side of the pair. This goes back on e.g. #67 and #90.

(Another note for the curious: we cannot use the revealing-constructor pattern, of new ReadableStream((queue) => ...), since that implies each ReadableStream has its own Queue instance, and thus defeats our goal of allowing them to be shared between a readable and writable stream.)

Real-World Usage

How, then, do I create a writable stream wrapping a web socket? As in the writable stream intro, for example. It would have to look something like these two possibilities:

// B: using the queue directly
function makeWritableWebSocketStream(url, protocols) {
  const socket = new WebSocket(url, protocols);
  var queue = new Queue();
  var ws = new WritableStream(queue);

  socket.onerror = er => queue.close(Promise.reject(er));
  socket.onopen = pump;
  return ws;

  function pump() {
    queue.dequeue().then(chunk => {
      if (chunk === Queue.EMPTY) {
        // NB: no way to signal failure or success of the close itself
        socket.close();
        return;
      }

      socket.send(chunk);
      pump();
    });
  }
}

// B: using a paired readable stream
function makeWritableWebSocketStream(url, protocols) {
  const socket = new WebSocket(url, protocols);

  // presumably this is a standard function that sugars over
  // manually creating two streams and one queue
  var { readable: rs, writable: ws } = makeStreamPair();

  socket.onerror = er => readable.cancel(er);
  socket.onopen = pump;
  return ws;

  function pump() {
    rs.read().then(chunk => {
      if (chunk === ReadableStream.EOS) {
        // NB: no way to signal failure or success of the close itself
        socket.close();
        return;
      }

      socket.send(chunk);
      pump();
    });
  }
}

Note that in both cases, since the queue has already been closed from the readable side, we don't have any way to signal the asynchronous success or failure of closing the underlying sink. This echoes our earlier problem with the consolidated write(), where since we had already vended a promise signaling that we could accept more data, we cannot use that promise to signal the status of the actual operation on the underlying sink.

A similar exercise can be carried out for readable streams. However, it requires quite some trickery to handle both pull and push underlying sources. Push underlying sources are simplest, as you can just enqueue into the queue as data arrives, but for pull ones, you have to manually invert your control, e.g. as seen in kriskowal/gtor#15, so that you only pull from the source when asked for data.

Conclusion

Two weeks ago I started out down this path in earnest, and was hopeful. I prototyped async read(), and found that although the EOS thing was kind of strange, it was a small upgrade in readability, especially with ES7 async/await syntax. I even rewrote the test suite for readable streams using this style. I then ran into the problems with consolidated wait() and write(chunk), but convinced myself to press on: after all, the pipe algorithm is most of what matters, I thought. But when I finally started trying to use the paired-streams design, it ran into enough problems that I can't in good conscience continue this effort.

In all these cases, the design ends up being conceptually elegant, but ill-suited to the real-world problem of wrapping underlying sources and sinks. You do want, as a first-class part of your API, to know when a value is flushed to the socket, or a file descriptor is properly closed. And you want an easy syntax for creating streams wrapping such resources, instead of having to create a pair then pump one side or invert the control on the other.

In the end, the async-read() simplification is the only one that emerges from all of this as potentially useful. But it doesn't really carry its own weight, in my opinion. Do others think it does? If so would they prefer { value, done } or EOS designs?

In any case, I now feel pretty content to leave the issue of reactive-style high-level sugar to libraries. Remember, the intent of the streams API was never to provide first-class events, event streams, observables, async iterables, or similar. Even though some of those use cases do indeed want backpressure and all that comes with it, in the end our primary goal here is to wrap the many sources and sinks present in the web and server ecosystems into a unified API; it is not to create a purely in-memory reactive graph through which data can flow declaratively. You can definitely build the latter on top of the former, and people do. But the lower level needs to be adapted for the primary use case instead of being designed as an elegant two-ended promise queue that falls down on the edge cases surrounding real-world resources. If you're OK with that tradeoff, it should be pretty simple to layer such an idea on top of whatwg/streams, similar to what has been done for Node streams. Indeed, I hope it will be much easier, given our more robust error handling and the like.

And of course, I maintain that readable streams will implement whatever async iterable interface makes its way into ES7 or ES8 or whatever, so that they can be used with for-on and the like.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment