[ES2017][WHATWG-Streams] Basic examples for WHATWG Streams API (on nodejs)
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// pulling to use the `Reader` of `ReadableStream` | |
const r1 = rs1.getReader(); | |
console.log("[1]", await r1.read()); //=> {value: "a", done: false} | |
console.log("[2]", await r1.read()); //=> {value: "b", done: false} | |
console.log("[3]", await r1.read()); //=> {value: "c", done: false} | |
console.log("[4]", await r1.read()); //=> {value: "d", done: false} | |
console.log("[5]", await r1.read()); //=> {value: undefined, done: true} | |
/* | |
// or as in loop await read() | |
for (let n = await r1.read(); !n.done; n = await r1.read()) { | |
console.log("[value]", n.value); | |
} | |
//*/ | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// as byte array specialized stream | |
const rs1 = new ReadableStream({ | |
type: "bytes", //IMPORTANT | |
async start(controller) { | |
console.log("[start]"); | |
//IMPORTANT: limits enqueue() with some ArrayBufferView | |
controller.enqueue(new Uint32Array([1])); | |
controller.enqueue(new Uint32Array([2])); | |
controller.enqueue(new Uint32Array([3])); | |
}, | |
async pull(controller) { | |
console.log("[pull]"); | |
controller.enqueue(new Uint32Array([4])); | |
controller.close(); | |
}, | |
async cancel(reason) { | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// pulling to use the `Reader` of `ReadableStream` | |
const r1 = rs1.getReader(); | |
// read value as Uint8Array regardless of enqueue-ing ArrayBufferView class | |
console.log("[1]", await r1.read()); //=> {value: u8[1,0,0,0], done: false} | |
console.log("[2]", await r1.read()); //=> {value: u8[2,0,0,0], done: false} | |
console.log("[3]", await r1.read()); //=> {value: u8[3,0,0,0], done: false} | |
console.log("[4]", await r1.read()); //=> {value: u8[4,0,0,0], done: false} | |
console.log("[5]", await r1.read()); //=> {value: undefined, done: true} | |
/* | |
// or as in loop await read() | |
for (let n = await r1.read(); !n.done; n = await r1.read()) { | |
console.log("[value]", n.value); | |
} | |
//*/ | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// writable stream | |
//NOTE: writable has no byte-array specific implementations | |
const ws1 = new WritableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
// to do some (asynchronous) initialization | |
}, | |
async write(chunk, controller) { | |
// called when writer.write() | |
console.log("[write]", chunk); | |
}, | |
async close(controller) { | |
console.log("[close]"); | |
}, | |
async abort(reason) { | |
// called when ws.abort(reason) | |
console.log("[abort]", reason); | |
} | |
}); | |
(async () => { | |
console.log(ws1.getWriter); | |
const w1 = ws1.getWriter(); | |
//NOTE: write and close may spawn error | |
for (const v of ["a", "b", "c"]) { | |
await w1.write(v); | |
} | |
await w1.close(); | |
})().catch(console.error); | |
// NOTE on difference between `close()` and `abort()` | |
// - close() is sequentially called after former write()s are finished | |
// - abort() is quickly called even if former write() is not yet finished |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
const {TextEncoder} = require("util"); | |
//NOTE: TransformStream is not yet in current standard spec | |
// transform stream | |
const ts1 = new TransformStream({ | |
async transform(chunk, controller) { | |
console.log("[transform]", chunk); | |
controller.enqueue(new TextEncoder().encode(chunk)); | |
}, | |
async flush(controller) { | |
console.log("[flush]"); | |
controller.close(); | |
}, | |
}); | |
(async () => { | |
const rs = ts1.readable; | |
const ws = ts1.writable; | |
//NOTE: no await because the returned promise is resolved after read() | |
const w = ws.getWriter(); | |
for (const ch of "abc") { | |
w.write(ch); | |
} | |
w.close(); | |
const r = rs.getReader(); | |
for (let n = await r.read(); !n.done; n = await r.read()) { | |
console.log("[value]", n.value); | |
} | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
const {TextEncoder} = require("util"); | |
// NOTE: TransformStream is not yet in standard, but | |
// the {readable, writable} object can be used as TransformStream in anywhere | |
// simple implementation for transform stream style readable/writable pair | |
function transformPair(transformer) { | |
let rcontroller = null, wcontroller = null; | |
const readable = new ReadableStream({ | |
type: transformer.type, | |
async start(controller) { | |
rcontroller = controller; | |
}, | |
async cancel(reason) { | |
wcontroller.error(reason); | |
}, | |
}); | |
const writable = new WritableStream({ | |
async start(controller) { | |
wcontroller = controller; | |
}, | |
async write(chunk, controller) { | |
await transformer.transform(chunk, rcontroller); | |
}, | |
async close(controller) { | |
await transformer.flush(rcontroller); | |
await rcontroller.close(); | |
}, | |
async abort(reason) { | |
await rcontroller.error(reason); | |
}, | |
}); | |
return {readable, writable}; | |
} | |
const ts1 = transformPair({ | |
type: "bytes", | |
async transform(chunk, controller) { | |
console.log("[transform]", chunk); | |
controller.enqueue(new TextEncoder().encode(chunk)); | |
}, | |
async flush(controller) { | |
console.log("[flush]"); | |
}, | |
}); | |
(async () => { | |
const rs = ts1.readable; | |
const ws = ts1.writable; | |
const w = ws.getWriter(); | |
for (const ch of "abc") { | |
await w.write(ch); | |
} | |
await w.close(); | |
const r = rs.getReader(); | |
for (let n = await r.read(); !n.done; n = await r.read()) { | |
console.log("[value]", n.value); | |
} | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// rs.tree() duplicate two ReadableStreams | |
const [rsA, rsB] = rs1.tee(); | |
const r1 = rsA.getReader(); | |
console.log("[A 1]", await r1.read()); //=> {value: "a", done: false} | |
console.log("[A 2]", await r1.read()); //=> {value: "b", done: false} | |
console.log("[A 3]", await r1.read()); //=> {value: "c", done: false} | |
console.log("[A 4]", await r1.read()); //=> {value: "d", done: false} | |
console.log("[A 5]", await r1.read()); //=> {value: undefined, done: true} | |
// or as in loop await read() | |
const r2 = rsB.getReader(); | |
for (let n = await r2.read(); !n.done; n = await r2.read()) { | |
console.log("[B value]", n.value); | |
} | |
})().catch(console.error); | |
// NOTE on rsA and rsB cancel() | |
// - rs canceled after both rsA and rsB are cancelled |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start readable]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
// writable stream | |
const ws1 = new WritableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start writable]"); | |
}, | |
async write(chunk, controller) { | |
// called when writer.write() | |
console.log("[write]", chunk); | |
}, | |
async close(controller) { | |
console.log("[close]"); | |
}, | |
async abort(reason) { | |
console.log("[abort]", reason); | |
} | |
}); | |
(async () => { | |
// rs.pipeTo(ws): readable sends to writable | |
await rs1.pipeTo(ws1); //NOTE: returned Promise waits writable closed | |
console.log("[finished]"); | |
})().catch(console.error); | |
// NOTE on rs.pipeTo(ws, opts) | |
// - {preventClose = false, preventAbort = false, preventCancel = false} = opts | |
// These prevents are just ignoring to source/sink callbacks at the situations: | |
// - rsc.close() => ws.close() | |
// - rsc.error() => ws.abort() | |
// - wsc.error() => rs.cancel() |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
const {TextEncoder} = require("util"); | |
//NOTE: TransformSTream is not yet in standard spec | |
// transform stream | |
const ts1 = new TransformStream({ | |
async transform(chunk, controller) { | |
console.log("[transform]", chunk); | |
controller.enqueue(new TextEncoder().encode(chunk)); | |
}, | |
async flush(controller) { | |
console.log("[flush]"); | |
controller.close(); | |
}, | |
}); | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// rs.pipeThrough(ts): rs send to ts.writable | |
// - returns ts.readable | |
const r = rs1.pipeThrough(ts1).getReader(); | |
for (let n = await r.read(); !n.done; n = await r.read()) { | |
console.log("[value]", n.value); | |
} | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// writable stream | |
const ws1 = new WritableStream({ | |
async start(controller) { | |
console.log("[start]"); | |
}, | |
async write(chunk, controller) { | |
console.log("[write]", chunk); | |
await new Promise(r => setTimeout(r, 1000)); // wait to next write | |
}, | |
async close(controller) { | |
console.log("[close]"); | |
}, | |
async abort(reason) { | |
console.log("[abort]", reason); | |
} | |
}, {highWaterMark: 2}); | |
//NOTE: highWaterMark: set queueing count to wait writing (default 1) | |
//- not a queue size limit, but a count to start waiting as ready | |
(async () => { | |
const w1 = ws1.getWriter(); | |
const start = Date.now(); | |
for (const v of "abcd") { | |
//* | |
await w1.ready; //IMPORTANT: wait to add write queue | |
console.log("[ready]", Date.now() - start, "msec"); | |
w1.write(v); // the Promise resolved after finish to write | |
//*/ | |
// compare: the difference with awaiting ready and write as: | |
/* | |
w1.ready; //IMPORTANT: wait to add write queue | |
console.log("[ready]", Date.now() - start, "msec"); | |
await w1.write(v); // the Promise resolved after finish to write | |
//*/ | |
} | |
await w1.close(); | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// example of cancel() | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
console.log("[start]"); | |
}, | |
async pull(controller) { | |
console.log("[read]"); | |
await new Promise((res, rej) => {}); // pending forever | |
}, | |
async cancel(reason) { | |
console.log("[cancel]", reason); | |
}, | |
}); | |
const r1 = rs1.getReader(); | |
// read await until cancel | |
(async () => { | |
console.log(await r1.read()); //=> {value: undefined, done: true} | |
})().catch(console.error); | |
// cancel after 500msec | |
(async () => { | |
await new Promise(r => setTimeout(r, 500)); | |
r1.cancel("timeout"); | |
})().catch(console.error); | |
//NOTE on `Reader` and lock | |
//- `r = rs.getReader()` create a new reader object that has a lock | |
// - next `r2 = rs.getReader()` spawn TypeError because the rs is locked | |
//- `r.releaseLock()` release its lock | |
// - it can get a new reader by `r2 = rs.getReader()` | |
//- single reader object can do `r.read()`/`r.cancel()`, it limited by the lock | |
// - `rs.cancel()` is also locked by a reader objecct |
//NOTE: (node >=8.8.0): use the option `node --harmony_async_iteration` to run | |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
// Async Iterator Proposal: async generator function and for-await-of loop | |
//- https://github.com/tc39/proposal-async-iteration | |
// `AsyncGeneratorFunction` syntax | |
async function* agen1(nextP) { | |
for (let n = await nextP(); !n.done; n = await nextP()) yield n.value; | |
} | |
// - `async function*` returns an object with the `Symbol.aysncIterator` method | |
// - The `Symbol.asyncIterator` method returns an object with `next` method | |
// - The `next` method returns a `Promise` of `{value, done}` | |
// - which has same means with basic ES6 iterator's `{value, done}` | |
function agen2(nextP) { | |
// emulate with basic function as above `async function*` code | |
return { | |
[Symbol.asyncIterator]() {return {next: nextP};} | |
}; | |
} | |
(async () => { | |
// pulling to use the `Reader` of `ReadableStream` | |
const r1 = rs1.getReader(); | |
// for-await-of loop syntax: | |
//* case 1: builtin AsyncGeneratorFunction | |
for await (const v of agen1(_ => r1.read())) console.log(v); | |
//*/ | |
/* case 2: asyncIterator object retured basic function | |
for await (const v of agen2(_ => r1.read())) console.log(v); | |
//*/ | |
/* case 3: inlined asyncIterator object | |
for await (const v of {[Symbol.asyncIterator]() {return {next() { | |
return r1.read(); | |
}};}}) console.log(v); | |
//*/ | |
})().catch(console.error); |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// source for object streams | |
class Producer { | |
constructor(name, max = 10, count = 0) { | |
this.name = name; | |
this.max = max; | |
this.count = count; | |
} | |
async start(controller) { | |
console.log(`[start ${this.name}]`); | |
} | |
async pull(controller) { | |
console.log(`[pull ${this.name}]: ${++this.count}`); | |
controller.enqueue(`${this.name} ${this.count}`); | |
if (this.count === this.max) controller.close(); | |
} | |
async cancel(reason) { | |
console.log(`[cancel ${this.name}]`, reason); | |
} | |
} | |
// sink for object stream | |
class Consumer { | |
constructor(name, max = 100, count = 0) { | |
this.name = name; | |
this.max = max; | |
this.count = count; | |
} | |
async start(controller) { | |
console.log(`[start ${this.name}]`); | |
} | |
async write(chunk, controller) { | |
if (this.count === this.max) { | |
controller.error(`[reach max] ${this.max}`); | |
} else { | |
console.log(`[write ${this.name} ${++this.count}]: ${chunk}`); | |
} | |
} | |
async close(controller) { | |
console.log(`[close ${this.name}]`); | |
} | |
async abort(reason) { | |
console.log(`[abort ${this.name}]: ${reason}`); | |
} | |
} | |
// non-locked pipeTo `Writer` for parallel read-to-write from multi RS | |
function pipeToWriter( | |
rs, w, {preventClose, preventCancel, preventAbort} = {}) { | |
let wactive = true; | |
const r = rs.getReader(); | |
const rclosed = r.closed.then(value => { | |
if (!preventClose && wactive) { | |
wactive = false; | |
return w.close().then(_ => value); | |
} | |
return value; | |
}, error => { | |
if (!preventClose && wactive) { | |
wactive = false; | |
return w.close().then(_ => {throw error;}); | |
} | |
throw error; | |
}); | |
const wclosed = w.closed.then(value => { | |
wactive = false; | |
if (!preventCancel) return r.cancel(). | |
then(_ => r.releaseLock()).then(_ => value); | |
return r.releaseLock().then(_ => value); | |
}, reason => { | |
wactive = false; | |
if (!preventCancel) return r.cancel(reason). | |
then(_ => r.releaseLock()).then(_ => {throw reason;}); | |
return r.reseaseLock().then(_ => {throw reason;}); | |
}); | |
function onReadError(error) { | |
if (!preventAbort) w.abort(error); | |
return {value: undefined, done: true}; | |
} | |
async function loop() { | |
while (true) { | |
await w.ready; | |
const {value, done} = await r.read().catch(onReadError); | |
if (done) break; | |
await w.write(value); | |
} | |
} | |
return Promise.race([loop().catch(err => {}), rclosed, wclosed]); | |
} | |
(async () => { | |
//* case 1: sequential two stream with pipeTo() | |
console.log("X: A1, A2, A3, A4, A5, A6, A7, A8, B1, B2"); | |
const rsA = new ReadableStream(new Producer("A", 8)); | |
const rsB = new ReadableStream(new Producer("B", 8)); | |
const wsX = new WritableStream(new Consumer("X", 10)); | |
// pipeTo locks the writer, so next pipeTo required to await to finish | |
await rsA.pipeTo(wsX, {preventClose: true}); | |
//NOTE: pipeTo fails when writor error | |
await rsB.pipeTo(wsX).catch(err => {}); // write until "B 2" | |
console.log(); | |
//*/ | |
//* case 2: parallel | |
console.log("Y: C1, D1, C2, D2, C3, D3, C4, D4, C5, D5"); | |
const rsC = new ReadableStream(new Producer("C", 8)); | |
const rsD = new ReadableStream(new Producer("D", 8)); | |
const wsY = new WritableStream(new Consumer("Y", 10)); | |
const wY = wsY.getWriter(); | |
await Promise.all([pipeToWriter(rsC, wY), pipeToWriter(rsD, wY)]).catch(console.error); | |
console.log(); | |
//*/ | |
//* case 3: merge single RS with TransformStream as queue | |
console.log("Z: E1, F1, E2, F2, E3, F3, E4, F4, E5, F5"); | |
const rsE = new ReadableStream(new Producer("E", 8)); | |
const rsF = new ReadableStream(new Producer("F", 8)); | |
const ts = new TransformStream(); | |
const tsw = ts.writable.getWriter(); | |
pipeToWriter(rsE, tsw, {preventClose: true}); | |
pipeToWriter(rsF, tsw, {preventClose: true}); | |
const wsZ = new WritableStream(new Consumer("Z", 10)); | |
await ts.readable.pipeTo(wsZ).catch(err => {}); | |
console.log(); | |
//*/ | |
})().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
My updated submodules and devDependencies version:
npm i bellbind/web-streams-polyfill
(it takes much long time to pull recursive submodules)