Inspired by Isaac's message at http://lists.w3.org/Archives/Public/public-webapps/2013JulSep/0355.html.
Points taken into account as important:
- Drop any idea of "read
n
bytes." - Reading strings vs.
ArrayBuffer
s vs. anything else must be a property of the stream. As such, we only handle binary data for now. However, our design is sound for any "concatable" resource, so strings work just as well.
Points where we differ:
- Isaac contends that, if there is already buffered data, waiting until the end of the microtask to consume it is unacceptable. We are not sure this is true; waiting certainly gives a more uniform and less awkward API, and may in practice not come at a cost. This is up for debate. It may be better to add both APIs, with the intent that the higher-level promise-returning
read
method is built on top of the lower-levelpoll
+readBuffer
+state
primitives Isaac describes. - Furthermore, promises are the web platform's async primitive, and as of ES6 the language's, so it is not problematic to layer on top of them as it would be in Node and ES5.
class ReadableStream {
constructor({ ToNumber highWaterMark }, function onRead)
Promise<ArrayBuffer|undefined> read()
WritableStream pipe(WritableStream destination)
}
- Calling
read
returns a promise for anArrayBuffer
containing all available bytes in the stream if there are bytes available right now, or waits until bytes are available and then fulfills the promise. - If there are no bytes left in the stream, the promise is fulfilled with
undefined
instead.
- This is called whenever the user calls
.read()
, and is passed three parameters:push
,done
, anderror
. - When data is available, call
push(data)
.- If it returns
false
, then nobody wants any more data, and the underlying buffer is full, so you should stop pushing as soon as possible, e.g. by pausing the underlying socket.onRead
will be called again when someone starts wanting data. - The
highWaterMark
value determines when it starts returningfalse
, i.e. when the underlying buffer is full. However, the buffer can expand indefinitely if you ignore the return value signal and keep pushing data. - If
push
returnstrue
, then you can keep callingpush
as more data becomes available from the underlying resource.
- If it returns
- Once there is no more data in the underlying resource, call
done()
. - If at any point an error occurs and you can no longer read any data from the underlying resource, call
error
, preferably with anError
instance.
- See below
Using ES6 generators and a library like Q or TaskJS to consume promises using yield
, you get the very nice consumption code:
Q.spawn(function *() {
const myStream = getTheStream();
while(let data = yield myStream.read()) {
console.log(data);
}
});
This example shows a contrived stream that emits a predetermined sequence of bytes, with one second delay in the middle, just to show how the API works. Since there is no underlying source of data, there is no reasonable way for it to respond to backpressure signals (i.e. there is no TCP socket to pause or such). But it shows the mechanism for how the body of onRead
interacts with read
.
// ab2str and str2ab from http://stackoverflow.com/a/11058858/3191
const s = new Stream({ highWaterMark: 1024 }, (push, done, error) => {
push(str2ab("123"));
push(str2ab("456"));
setTimeout(() => {
push(str2ab("789"));
done();
push(str2ab("101112")); // ignored
error(new Error("whatever")); // ignored
}, 1000);
});
s.read(); // promise for ArrayBuffer of "123456"
s.read(); // promise for ArrayBuffer of "123456"
s.read().then(val => {
assert(ab2str(val) === "123456");
s.read(); // promise for ArrayBuffer of "789"
s.read().then(val => {
assert(ab2str(val) === "789");
s.read(); // promise for `undefined`.
});
});
This example adopts an incoming stream of websocket messages to a ReadableStream
. Since websockets do not expose a method for sending pause messages to the underlying socket, we do not respond to backpressure here.
const socket = new WebSocket("ws://example.com/push-me-data");
socket.binaryType = 'arraybuffer';
const stream = new ReadableStream({ highWaterMark: 1024 }, (push, done, error) => {
socket.onerror = function (message) {
error(new Error('WebSocket error: ' + message));
};
socket.ondata = function (event) {
push(event.data);
};
socket.onclose = done;
});
TODO: maybe build an ad-hoc websocket protocol where we tell the server to stop sending via some message.
- Do we need a way to signal loss of interest in the source of data ("abort")? This would avoid implementations buffering incoming data that nobody will use, or keeping open sockets or file descriptors that will never be read from again.
- Node doesn't have this in the base stream interface. But it has ad-hoc versions in certain streams.
- The benefit of having it in the base stream interface is that an abort signal could flow backward through piped streams.
- Concurrent read and abort? Reference counting?? How would you know? This might necessitate introducing a streamreader-type class, ick.
done()
+error(err)
vs.done()
+done(err)
.- Options object as first argument to constructor is annoying, since it makes it non-optional (or if we did make it optional,
new ReadableStream(undefined, (...) => ...)
isn't exactly pretty). But putting it after the function parameter seems even worse. - Is Node's
unshift
necessary? If so, is that really a good name?- Gut instinct: you could build it in userspace, so it's not technically necessary for first draft.
{ boolean done, ArrayBuffer value }
instead ofArrayBuffer|undefined
would negate needing one moreread()
call after the data is finished to find out if you're truly finished.- However, after exploring this, I think the parallel with generators is misplaced, and would be misleading.
- It hurts the nice consumer API using generators (or ES7
await
) that the promise version gives.
- The concatenating I do here is probably not necessary or desirable. Kill it if possible.
Writable streams are significantly simpler than readable streams. But I haven't written them down yet, eek!
- Should be sure to create mixin versions of these, for things that don't want to implement the constructor (e.g. built-in streams). This would allow easy creation of duplex streams as well.
Hi! I had some random discussions with sicking about this over the last two weeks. My use cases are heavily influenced by not only node but also what we planned for FirefoxOS (and my general sadness of the lack of "streams" in our tcp/http apis)
done(err)
FTW if I can influence this somehow :)