Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
[ES2017][WHATWG-Streams] Basic examples for WHATWG Streams API (on nodejs)
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// (default) object stream
const rs1 = new ReadableStream({
async start(controller) {
// called by constructor
console.log("[start]");
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
},
async pull(controller) {
// called read when controller's queue is empty
console.log("[pull]");
controller.enqueue("d");
controller.close(); // or controller.error();
},
async cancel(reason) {
// called when rs.cancel(reason)
console.log("[cancel]", reason);
},
});
(async () => {
// pulling to use the `Reader` of `ReadableStream`
const r1 = rs1.getReader();
console.log("[1]", await r1.read()); //=> {value: "a", done: false}
console.log("[2]", await r1.read()); //=> {value: "b", done: false}
console.log("[3]", await r1.read()); //=> {value: "c", done: false}
console.log("[4]", await r1.read()); //=> {value: "d", done: false}
console.log("[5]", await r1.read()); //=> {value: undefined, done: true}
/*
// or as in loop await read()
for (let n = await r1.read(); !n.done; n = await r1.read()) {
console.log("[value]", n.value);
}
//*/
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// as byte array specialized stream
const rs1 = new ReadableStream({
type: "bytes", //IMPORTANT
async start(controller) {
console.log("[start]");
//IMPORTANT: limits enqueue() with some ArrayBufferView
controller.enqueue(new Uint32Array([1]));
controller.enqueue(new Uint32Array([2]));
controller.enqueue(new Uint32Array([3]));
},
async pull(controller) {
console.log("[pull]");
controller.enqueue(new Uint32Array([4]));
controller.close();
},
async cancel(reason) {
console.log("[cancel]", reason);
},
});
(async () => {
// pulling to use the `Reader` of `ReadableStream`
const r1 = rs1.getReader();
// read value as Uint8Array regardless of enqueue-ing ArrayBufferView class
console.log("[1]", await r1.read()); //=> {value: u8[1,0,0,0], done: false}
console.log("[2]", await r1.read()); //=> {value: u8[2,0,0,0], done: false}
console.log("[3]", await r1.read()); //=> {value: u8[3,0,0,0], done: false}
console.log("[4]", await r1.read()); //=> {value: u8[4,0,0,0], done: false}
console.log("[5]", await r1.read()); //=> {value: undefined, done: true}
/*
// or as in loop await read()
for (let n = await r1.read(); !n.done; n = await r1.read()) {
console.log("[value]", n.value);
}
//*/
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// writable stream
//NOTE: writable has no byte-array specific implementations
const ws1 = new WritableStream({
async start(controller) {
// called by constructor
console.log("[start]");
// to do some (asynchronous) initialization
},
async write(chunk, controller) {
// called when writer.write()
console.log("[write]", chunk);
},
async close(controller) {
console.log("[close]");
},
async abort(reason) {
// called when ws.abort(reason)
console.log("[abort]", reason);
}
});
(async () => {
console.log(ws1.getWriter);
const w1 = ws1.getWriter();
//NOTE: write and close may spawn error
for (const v of ["a", "b", "c"]) {
await w1.write(v);
}
await w1.close();
})().catch(console.error);
// NOTE on difference between `close()` and `abort()`
// - close() is sequentially called after former write()s are finished
// - abort() is quickly called even if former write() is not yet finished
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
const {TextEncoder} = require("util");
//NOTE: TransformStream is not yet in current standard spec
// transform stream
const ts1 = new TransformStream({
async transform(chunk, controller) {
console.log("[transform]", chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
async flush(controller) {
console.log("[flush]");
controller.close();
},
});
(async () => {
const rs = ts1.readable;
const ws = ts1.writable;
//NOTE: no await because the returned promise is resolved after read()
const w = ws.getWriter();
for (const ch of "abc") {
w.write(ch);
}
w.close();
const r = rs.getReader();
for (let n = await r.read(); !n.done; n = await r.read()) {
console.log("[value]", n.value);
}
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
const {TextEncoder} = require("util");
// NOTE: TransformStream is not yet in standard, but
// the {readable, writable} object can be used as TransformStream in anywhere
// simple implementation for transform stream style readable/writable pair
function transformPair(transformer) {
let rcontroller = null, wcontroller = null;
const readable = new ReadableStream({
type: transformer.type,
async start(controller) {
rcontroller = controller;
},
async cancel(reason) {
wcontroller.error(reason);
},
});
const writable = new WritableStream({
async start(controller) {
wcontroller = controller;
},
async write(chunk, controller) {
await transformer.transform(chunk, rcontroller);
},
async close(controller) {
await transformer.flush(rcontroller);
await rcontroller.close();
},
async abort(reason) {
await rcontroller.error(reason);
},
});
return {readable, writable};
}
const ts1 = transformPair({
type: "bytes",
async transform(chunk, controller) {
console.log("[transform]", chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
async flush(controller) {
console.log("[flush]");
},
});
(async () => {
const rs = ts1.readable;
const ws = ts1.writable;
const w = ws.getWriter();
for (const ch of "abc") {
await w.write(ch);
}
await w.close();
const r = rs.getReader();
for (let n = await r.read(); !n.done; n = await r.read()) {
console.log("[value]", n.value);
}
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// (default) object stream
const rs1 = new ReadableStream({
async start(controller) {
// called by constructor
console.log("[start]");
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
},
async pull(controller) {
// called read when controller's queue is empty
console.log("[pull]");
controller.enqueue("d");
controller.close(); // or controller.error();
},
async cancel(reason) {
// called when rs.cancel(reason)
console.log("[cancel]", reason);
},
});
(async () => {
// rs.tree() duplicate two ReadableStreams
const [rsA, rsB] = rs1.tee();
const r1 = rsA.getReader();
console.log("[A 1]", await r1.read()); //=> {value: "a", done: false}
console.log("[A 2]", await r1.read()); //=> {value: "b", done: false}
console.log("[A 3]", await r1.read()); //=> {value: "c", done: false}
console.log("[A 4]", await r1.read()); //=> {value: "d", done: false}
console.log("[A 5]", await r1.read()); //=> {value: undefined, done: true}
// or as in loop await read()
const r2 = rsB.getReader();
for (let n = await r2.read(); !n.done; n = await r2.read()) {
console.log("[B value]", n.value);
}
})().catch(console.error);
// NOTE on rsA and rsB cancel()
// - rs canceled after both rsA and rsB are cancelled
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// (default) object stream
const rs1 = new ReadableStream({
async start(controller) {
// called by constructor
console.log("[start readable]");
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
},
async pull(controller) {
// called read when controller's queue is empty
console.log("[pull]");
controller.enqueue("d");
controller.close(); // or controller.error();
},
async cancel(reason) {
// called when rs.cancel(reason)
console.log("[cancel]", reason);
},
});
// writable stream
const ws1 = new WritableStream({
async start(controller) {
// called by constructor
console.log("[start writable]");
},
async write(chunk, controller) {
// called when writer.write()
console.log("[write]", chunk);
},
async close(controller) {
console.log("[close]");
},
async abort(reason) {
console.log("[abort]", reason);
}
});
(async () => {
// rs.pipeTo(ws): readable sends to writable
await rs1.pipeTo(ws1); //NOTE: returned Promise waits writable closed
console.log("[finished]");
})().catch(console.error);
// NOTE on rs.pipeTo(ws, opts)
// - {preventClose = false, preventAbort = false, preventCancel = false} = opts
// These prevents are just ignoring to source/sink callbacks at the situations:
// - rsc.close() => ws.close()
// - rsc.error() => ws.abort()
// - wsc.error() => rs.cancel()
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
const {TextEncoder} = require("util");
//NOTE: TransformSTream is not yet in standard spec
// transform stream
const ts1 = new TransformStream({
async transform(chunk, controller) {
console.log("[transform]", chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
async flush(controller) {
console.log("[flush]");
controller.close();
},
});
const rs1 = new ReadableStream({
async start(controller) {
// called by constructor
console.log("[start]");
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
},
async pull(controller) {
// called read when controller's queue is empty
console.log("[pull]");
controller.enqueue("d");
controller.close(); // or controller.error();
},
async cancel(reason) {
// called when rs.cancel(reason)
console.log("[cancel]", reason);
},
});
(async () => {
// rs.pipeThrough(ts): rs send to ts.writable
// - returns ts.readable
const r = rs1.pipeThrough(ts1).getReader();
for (let n = await r.read(); !n.done; n = await r.read()) {
console.log("[value]", n.value);
}
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// writable stream
const ws1 = new WritableStream({
async start(controller) {
console.log("[start]");
},
async write(chunk, controller) {
console.log("[write]", chunk);
await new Promise(r => setTimeout(r, 1000)); // wait to next write
},
async close(controller) {
console.log("[close]");
},
async abort(reason) {
console.log("[abort]", reason);
}
}, {highWaterMark: 2});
//NOTE: highWaterMark: set queueing count to wait writing (default 1)
//- not a queue size limit, but a count to start waiting as ready
(async () => {
const w1 = ws1.getWriter();
const start = Date.now();
for (const v of "abcd") {
//*
await w1.ready; //IMPORTANT: wait to add write queue
console.log("[ready]", Date.now() - start, "msec");
w1.write(v); // the Promise resolved after finish to write
//*/
// compare: the difference with awaiting ready and write as:
/*
w1.ready; //IMPORTANT: wait to add write queue
console.log("[ready]", Date.now() - start, "msec");
await w1.write(v); // the Promise resolved after finish to write
//*/
}
await w1.close();
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// example of cancel()
const rs1 = new ReadableStream({
async start(controller) {
console.log("[start]");
},
async pull(controller) {
console.log("[read]");
await new Promise((res, rej) => {}); // pending forever
},
async cancel(reason) {
console.log("[cancel]", reason);
},
});
const r1 = rs1.getReader();
// read await until cancel
(async () => {
console.log(await r1.read()); //=> {value: undefined, done: true}
})().catch(console.error);
// cancel after 500msec
(async () => {
await new Promise(r => setTimeout(r, 500));
r1.cancel("timeout");
})().catch(console.error);
//NOTE on `Reader` and lock
//- `r = rs.getReader()` create a new reader object that has a lock
// - next `r2 = rs.getReader()` spawn TypeError because the rs is locked
//- `r.releaseLock()` release its lock
// - it can get a new reader by `r2 = rs.getReader()`
//- single reader object can do `r.read()`/`r.cancel()`, it limited by the lock
// - `rs.cancel()` is also locked by a reader objecct
//NOTE: (node >=8.8.0): use the option `node --harmony_async_iteration` to run
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// (default) object stream
const rs1 = new ReadableStream({
async start(controller) {
// called by constructor
console.log("[start]");
controller.enqueue("a");
controller.enqueue("b");
controller.enqueue("c");
},
async pull(controller) {
// called read when controller's queue is empty
console.log("[pull]");
controller.enqueue("d");
controller.close(); // or controller.error();
},
async cancel(reason) {
// called when rs.cancel(reason)
console.log("[cancel]", reason);
},
});
// Async Iterator Proposal: async generator function and for-await-of loop
//- https://github.com/tc39/proposal-async-iteration
// `AsyncGeneratorFunction` syntax
async function* agen1(nextP) {
for (let n = await nextP(); !n.done; n = await nextP()) yield n.value;
}
// - `async function*` returns an object with the `Symbol.aysncIterator` method
// - The `Symbol.asyncIterator` method returns an object with `next` method
// - The `next` method returns a `Promise` of `{value, done}`
// - which has same means with basic ES6 iterator's `{value, done}`
function agen2(nextP) {
// emulate with basic function as above `async function*` code
return {
[Symbol.asyncIterator]() {return {next: nextP};}
};
}
(async () => {
// pulling to use the `Reader` of `ReadableStream`
const r1 = rs1.getReader();
// for-await-of loop syntax:
//* case 1: builtin AsyncGeneratorFunction
for await (const v of agen1(_ => r1.read())) console.log(v);
//*/
/* case 2: asyncIterator object retured basic function
for await (const v of agen2(_ => r1.read())) console.log(v);
//*/
/* case 3: inlined asyncIterator object
for await (const v of {[Symbol.asyncIterator]() {return {next() {
return r1.read();
}};}}) console.log(v);
//*/
})().catch(console.error);
// npm i web-streams-polyfill
const streams = require("web-streams-polyfill");
const {ReadableStream, WritableStream, TransformStream} = streams;
// source for object streams
class Producer {
constructor(name, max = 10, count = 0) {
this.name = name;
this.max = max;
this.count = count;
}
async start(controller) {
console.log(`[start ${this.name}]`);
}
async pull(controller) {
console.log(`[pull ${this.name}]: ${++this.count}`);
controller.enqueue(`${this.name} ${this.count}`);
if (this.count === this.max) controller.close();
}
async cancel(reason) {
console.log(`[cancel ${this.name}]`, reason);
}
}
// sink for object stream
class Consumer {
constructor(name, max = 100, count = 0) {
this.name = name;
this.max = max;
this.count = count;
}
async start(controller) {
console.log(`[start ${this.name}]`);
}
async write(chunk, controller) {
if (this.count === this.max) {
controller.error(`[reach max] ${this.max}`);
} else {
console.log(`[write ${this.name} ${++this.count}]: ${chunk}`);
}
}
async close(controller) {
console.log(`[close ${this.name}]`);
}
async abort(reason) {
console.log(`[abort ${this.name}]: ${reason}`);
}
}
// non-locked pipeTo `Writer` for parallel read-to-write from multi RS
function pipeToWriter(
rs, w, {preventClose, preventCancel, preventAbort} = {}) {
let wactive = true;
const r = rs.getReader();
const rclosed = r.closed.then(value => {
if (!preventClose && wactive) {
wactive = false;
return w.close().then(_ => value);
}
return value;
}, error => {
if (!preventClose && wactive) {
wactive = false;
return w.close().then(_ => {throw error;});
}
throw error;
});
const wclosed = w.closed.then(value => {
wactive = false;
if (!preventCancel) return r.cancel().
then(_ => r.releaseLock()).then(_ => value);
return r.releaseLock().then(_ => value);
}, reason => {
wactive = false;
if (!preventCancel) return r.cancel(reason).
then(_ => r.releaseLock()).then(_ => {throw reason;});
return r.reseaseLock().then(_ => {throw reason;});
});
function onReadError(error) {
if (!preventAbort) w.abort(error);
return {value: undefined, done: true};
}
async function loop() {
while (true) {
await w.ready;
const {value, done} = await r.read().catch(onReadError);
if (done) break;
await w.write(value);
}
}
return Promise.race([loop().catch(err => {}), rclosed, wclosed]);
}
(async () => {
//* case 1: sequential two stream with pipeTo()
console.log("X: A1, A2, A3, A4, A5, A6, A7, A8, B1, B2");
const rsA = new ReadableStream(new Producer("A", 8));
const rsB = new ReadableStream(new Producer("B", 8));
const wsX = new WritableStream(new Consumer("X", 10));
// pipeTo locks the writer, so next pipeTo required to await to finish
await rsA.pipeTo(wsX, {preventClose: true});
//NOTE: pipeTo fails when writor error
await rsB.pipeTo(wsX).catch(err => {}); // write until "B 2"
console.log();
//*/
//* case 2: parallel
console.log("Y: C1, D1, C2, D2, C3, D3, C4, D4, C5, D5");
const rsC = new ReadableStream(new Producer("C", 8));
const rsD = new ReadableStream(new Producer("D", 8));
const wsY = new WritableStream(new Consumer("Y", 10));
const wY = wsY.getWriter();
await Promise.all([pipeToWriter(rsC, wY), pipeToWriter(rsD, wY)]).catch(console.error);
console.log();
//*/
//* case 3: merge single RS with TransformStream as queue
console.log("Z: E1, F1, E2, F2, E3, F3, E4, F4, E5, F5");
const rsE = new ReadableStream(new Producer("E", 8));
const rsF = new ReadableStream(new Producer("F", 8));
const ts = new TransformStream();
const tsw = ts.writable.getWriter();
pipeToWriter(rsE, tsw, {preventClose: true});
pipeToWriter(rsF, tsw, {preventClose: true});
const wsZ = new WritableStream(new Consumer("Z", 10));
await ts.readable.pipeTo(wsZ).catch(err => {});
console.log();
//*/
})().catch(console.error);
@bellbind

This comment has been minimized.

Copy link
Owner Author

@bellbind bellbind commented Nov 2, 2017

My updated submodules and devDependencies version:

npm i bellbind/web-streams-polyfill (it takes much long time to pull recursive submodules)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment