Skip to content

Instantly share code, notes, and snippets.

@domenic
Last active August 30, 2023 14:13
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save domenic/0c47ae300608341f3d7f to your computer and use it in GitHub Desktop.
Save domenic/0c47ae300608341f3d7f to your computer and use it in GitHub Desktop.
Streams for the web

Streams for the Web

Inspired by Isaac's message at http://lists.w3.org/Archives/Public/public-webapps/2013JulSep/0355.html.

Points taken into account as important:

  • Drop any idea of "read n bytes."
  • Reading strings vs. ArrayBuffers vs. anything else must be a property of the stream. As such, we only handle binary data for now. However, our design is sound for any "concatable" resource, so strings work just as well.

Points where we differ:

  • Isaac contends that, if there is already buffered data, waiting until the end of the microtask to consume it is unacceptable. We are not sure this is true; waiting certainly gives a more uniform and less awkward API, and may in practice not come at a cost. This is up for debate. It may be better to add both APIs, with the intent that the higher-level promise-returning read method is built on top of the lower-level poll + readBuffer + state primitives Isaac describes.
  • Furthermore, promises are the web platform's async primitive, and as of ES6 the language's, so it is not problematic to layer on top of them as it would be in Node and ES5.

Readable Streams

class ReadableStream {
  constructor({ ToNumber highWaterMark }, function onRead)
  Promise<ArrayBuffer|undefined> read()
  WritableStream pipe(WritableStream destination)
}

read

  • Calling read returns a promise for an ArrayBuffer containing all available bytes in the stream if there are bytes available right now, or waits until bytes are available and then fulfills the promise.
  • If there are no bytes left in the stream, the promise is fulfilled with undefined instead.

onRead

  • This is called whenever the user calls .read(), and is passed three parameters: push, done, and error.
  • When data is available, call push(data).
    • If it returns false, then nobody wants any more data, and the underlying buffer is full, so you should stop pushing as soon as possible, e.g. by pausing the underlying socket. onRead will be called again when someone starts wanting data.
    • The highWaterMark value determines when it starts returning false, i.e. when the underlying buffer is full. However, the buffer can expand indefinitely if you ignore the return value signal and keep pushing data.
    • If push returns true, then you can keep calling push as more data becomes available from the underlying resource.
  • Once there is no more data in the underlying resource, call done().
  • If at any point an error occurs and you can no longer read any data from the underlying resource, call error, preferably with an Error instance.

pipe

  • See below

Examples

Consuming a Stream

Using ES6 generators and a library like Q or TaskJS to consume promises using yield, you get the very nice consumption code:

Q.spawn(function *() {
    const myStream = getTheStream();

    while(let data = yield myStream.read()) {
      console.log(data);
    }
});

Contrived API Example

This example shows a contrived stream that emits a predetermined sequence of bytes, with one second delay in the middle, just to show how the API works. Since there is no underlying source of data, there is no reasonable way for it to respond to backpressure signals (i.e. there is no TCP socket to pause or such). But it shows the mechanism for how the body of onRead interacts with read.

// ab2str and str2ab from http://stackoverflow.com/a/11058858/3191

const s = new Stream({ highWaterMark: 1024 }, (push, done, error) => {
    push(str2ab("123"));
    push(str2ab("456"));

    setTimeout(() => {
        push(str2ab("789"));
        done();

        push(str2ab("101112")); // ignored
        error(new Error("whatever")); // ignored
    }, 1000);
});

s.read(); // promise for ArrayBuffer of "123456"
s.read(); // promise for ArrayBuffer of "123456"

s.read().then(val => {
    assert(ab2str(val) === "123456");

    s.read(); // promise for ArrayBuffer of "789"

    s.read().then(val => {
        assert(ab2str(val) === "789");

        s.read(); // promise for `undefined`.
    });
});

Producing a Stream, no Backpressure

This example adopts an incoming stream of websocket messages to a ReadableStream. Since websockets do not expose a method for sending pause messages to the underlying socket, we do not respond to backpressure here.

const socket = new WebSocket("ws://example.com/push-me-data");
socket.binaryType = 'arraybuffer';

const stream = new ReadableStream({ highWaterMark: 1024 }, (push, done, error) => {
    socket.onerror = function (message) {
        error(new Error('WebSocket error: ' + message));
    };

    socket.ondata = function (event) {
        push(event.data);
    };

    socket.onclose = done;
});

Producing a Stream, with Backpressure

TODO: maybe build an ad-hoc websocket protocol where we tell the server to stop sending via some message.

Open Issues

  • Do we need a way to signal loss of interest in the source of data ("abort")? This would avoid implementations buffering incoming data that nobody will use, or keeping open sockets or file descriptors that will never be read from again.
    • Node doesn't have this in the base stream interface. But it has ad-hoc versions in certain streams.
    • The benefit of having it in the base stream interface is that an abort signal could flow backward through piped streams.
    • Concurrent read and abort? Reference counting?? How would you know? This might necessitate introducing a streamreader-type class, ick.
  • done() + error(err) vs. done() + done(err).
  • Options object as first argument to constructor is annoying, since it makes it non-optional (or if we did make it optional, new ReadableStream(undefined, (...) => ...) isn't exactly pretty). But putting it after the function parameter seems even worse.
  • Is Node's unshift necessary? If so, is that really a good name?
    • Gut instinct: you could build it in userspace, so it's not technically necessary for first draft.
  • { boolean done, ArrayBuffer value } instead of ArrayBuffer|undefined would negate needing one more read() call after the data is finished to find out if you're truly finished.
    • However, after exploring this, I think the parallel with generators is misplaced, and would be misleading.
    • It hurts the nice consumer API using generators (or ES7 await) that the promise version gives.
  • The concatenating I do here is probably not necessary or desirable. Kill it if possible.

Writable Streams

Writable streams are significantly simpler than readable streams. But I haven't written them down yet, eek!

Piping Reable Streams to Writable Streams

Other Open Issues

  • Should be sure to create mixin versions of these, for things that don't want to implement the constructor (e.g. built-in streams). This would allow easy creation of duplex streams as well.
@lightsofapollo
Copy link

Hi! I had some random discussions with sicking about this over the last two weeks. My use cases are heavily influenced by not only node but also what we planned for FirefoxOS (and my general sadness of the lack of "streams" in our tcp/http apis)

  • Readable streams should be postMessage friendly (Transferable?)
  • Abort: This can be very useful but mostly in edge cases (user cancels some network bound operation happening on another thread which is then processed in some way)
  • Consuming objects as part of a stream ( there are various other proposals on EventStreams and other things ) but particularly in the case where content can create its own streams I like the idea of having the capability to do something like this:
    • Get a stream from xhr/tcp lets call this: platformReadable
    • Pipe [either via some content wrapper or as part of the api] the stream into contentConvertsToObject
    • read objects from contentConvertsToObject
  • Passing functions in web content outside of event listeners is always awkward but done(err) FTW if I can influence this somehow :)
  • Back pressure in content: https://gist.github.com/wycats/dc1400f7f54a6ae85a32 <- I found this approach interesting and more explicit vs returning a Boolean

@lightsofapollo
Copy link

High water mark (this is only in the context of readable)

Node's internal use of highWaterMark is mostly in conjunction with the ability to call an explicit method to read with an explicit number number of bytes triggered by a request for data.

To have sane piping I believe we need the ability to halt the flow of data on the readable side (if the writer is blocked we can attempt to pause the reader). This is where having some enforced convention seems useful:

An example with our current api:

const stream = new ReadableStream({ highWaterMark: 1024 }, (push, done, error) => {
  var timerId;  
  function spam() {
    var defer = push(str2ab("why can't you come up with a better example"));

    // lets say your a good person
    if (defer) return promise.then(spam);

    timerId = setTimeout(spam);
  }

  spam();
});

In the case where the promise is ignored (we should also set a flag?) then reporting it somehow (onerror?)


From here it feels like we are sort of back to .pause and .resume in node and piping could be implemented on top these and responding to back pressure from writers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment