Created
June 12, 2020 06:14
-
-
Save bellbind/7c9954a5e790f802259b0f12e8a0f976 to your computer and use it in GitHub Desktop.
[JavaScript][deno]Web Streams API basic examples with `highWaterMark`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Web Streams API basic example for highWaterMark option | |
// 1. raw ReadableStream pattern | |
const rawReader = async () => { | |
const blob = new Blob(["Hello ", "World"]); | |
const rstream = blob.stream(); | |
const reader = rstream.getReader(); | |
try { | |
for (let {value, done} = await reader.read(); !done; | |
{value, done} = await reader.read()) { | |
console.log("[raw reader]", value); | |
} | |
} finally { | |
reader.releaseLock(); //IMPORTANT | |
} | |
}; | |
await rawReader().catch(console.error); | |
// 2. asyncIterator wrapper for ReadableStream | |
const asyncIteratorRS = async function* (rs) { | |
const reader = rs.getReader(); | |
try { | |
for (let {value, done} = await reader.read(); !done; | |
{value, done} = await reader.read()) { | |
yield value; | |
} | |
} finally { | |
reader.releaseLock(); | |
} | |
}; | |
const wrappedReader = async () => { | |
const blob = new Blob(["Hello ", "World"]); | |
const rstream = blob.stream(); | |
for await (const value of asyncIteratorRS(rstream)) { | |
console.log("[wrapped reader]", value); | |
} | |
}; | |
await wrappedReader().catch(console.error); | |
// 3. raw WritableStream | |
const slowSink = (delay = 100) => { | |
return { | |
async write(chunk, controller) { | |
console.log(chunk); | |
await new Promise(f => setTimeout(f, delay)); | |
}, | |
}; | |
}; | |
const rawWriter = async () => { | |
const data = ["Hello ", "World"]; | |
const wstream = new WritableStream(slowSink()); | |
const writer = wstream.getWriter(); | |
try { | |
for (const chunk of data) { | |
await writer.write(chunk); | |
} | |
} finally { | |
writer.releaseLock(); | |
} | |
}; | |
await rawWriter().catch(console.error); | |
// 4. writer with highWaterMark > 1 | |
const queuingWriter = async () => { | |
// NOTE: WritableStream has growing buffer to write, so | |
// programmers code for "how long waiting to write". | |
// WritableStream's `higWaterMark` option is a length to waiting: | |
// - `await writer.ready` waits until waiting length >= highWaterMark | |
const data = [1,2,3,4,5,6,7,8]; | |
const strategy = {highWaterMark: 3}; // waiting length for writer.ready | |
const wstream = new WritableStream(slowSink(), strategy); | |
const writer = wstream.getWriter(); | |
try { | |
for (const chunk of data) { | |
//await writer.write(chunk); // wait finished each write | |
await writer.ready; // wait until waiting size < highWaterMark | |
writer.write(chunk); // not wait finished each write | |
console.log("write", chunk); | |
} | |
} finally { | |
writer.releaseLock(); | |
} | |
// wstream.close waits after buffer flushed | |
await wstream.close(); | |
}; | |
await queuingWriter().catch(console.error); | |
// Readable source with highWaterMark | |
const readableSource = (array) => { | |
const itor = array[Symbol.iterator](); | |
return { | |
async pull(controller) { | |
const {value, done} = itor.next(); | |
if (done) { | |
controller.close(); | |
} else { | |
controller.enqueue(value); | |
console.log("pull", value); | |
} | |
}, | |
}; | |
}; | |
const queuingReader = async () => { | |
// NOTE: ReadableStream's highWaterMark is wait length to call source.pull() | |
// - call source.pull() when waiting length < highWaterMark | |
const data = [1,2,3,4,5,6,7,8]; | |
const strategy = {highWaterMark: 3}; // waiting length for controller.enqueue | |
const rstream = new ReadableStream(readableSource(data), strategy); | |
const reader = rstream.getReader(); | |
try { | |
for (let {value, done} = await reader.read(); !done; | |
{value, done} = await reader.read()) { | |
console.log(value); | |
await new Promise(f => setTimeout(f, 100)); | |
} | |
} finally { | |
reader.releaseLock(); | |
} | |
}; | |
await queuingReader().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
run with deno as: