Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
[JavaScript][deno]Web Streams API basic examples with `highWaterMark`
// Web Streams API basic example for highWaterMark option
// 1. raw ReadableStream pattern
const rawReader = async () => {
const blob = new Blob(["Hello ", "World"]);
const rstream = blob.stream();
const reader = rstream.getReader();
try {
for (let {value, done} = await reader.read(); !done;
{value, done} = await reader.read()) {
console.log("[raw reader]", value);
}
} finally {
reader.releaseLock(); //IMPORTANT
}
};
await rawReader().catch(console.error);
// 2. asyncIterator wrapper for ReadableStream
const asyncIteratorRS = async function* (rs) {
const reader = rs.getReader();
try {
for (let {value, done} = await reader.read(); !done;
{value, done} = await reader.read()) {
yield value;
}
} finally {
reader.releaseLock();
}
};
const wrappedReader = async () => {
const blob = new Blob(["Hello ", "World"]);
const rstream = blob.stream();
for await (const value of asyncIteratorRS(rstream)) {
console.log("[wrapped reader]", value);
}
};
await wrappedReader().catch(console.error);
// 3. raw WritableStream
const slowSink = (delay = 100) => {
return {
async write(chunk, controller) {
console.log(chunk);
await new Promise(f => setTimeout(f, delay));
},
};
};
const rawWriter = async () => {
const data = ["Hello ", "World"];
const wstream = new WritableStream(slowSink());
const writer = wstream.getWriter();
try {
for (const chunk of data) {
await writer.write(chunk);
}
} finally {
writer.releaseLock();
}
};
await rawWriter().catch(console.error);
// 4. writer with highWaterMark > 1
const queuingWriter = async () => {
// NOTE: WritableStream has growing buffer to write, so
// programmers code for "how long waiting to write".
// WritableStream's `higWaterMark` option is a length to waiting:
// - `await writer.ready` waits until waiting length >= highWaterMark
const data = [1,2,3,4,5,6,7,8];
const strategy = {highWaterMark: 3}; // waiting length for writer.ready
const wstream = new WritableStream(slowSink(), strategy);
const writer = wstream.getWriter();
try {
for (const chunk of data) {
//await writer.write(chunk); // wait finished each write
await writer.ready; // wait until waiting size < highWaterMark
writer.write(chunk); // not wait finished each write
console.log("write", chunk);
}
} finally {
writer.releaseLock();
}
// wstream.close waits after buffer flushed
await wstream.close();
};
await queuingWriter().catch(console.error);
// Readable source with highWaterMark
const readableSource = (array) => {
const itor = array[Symbol.iterator]();
return {
async pull(controller) {
const {value, done} = itor.next();
if (done) {
controller.close();
} else {
controller.enqueue(value);
console.log("pull", value);
}
},
};
};
const queuingReader = async () => {
// NOTE: ReadableStream's highWaterMark is wait length to call source.pull()
// - call source.pull() when waiting length < highWaterMark
const data = [1,2,3,4,5,6,7,8];
const strategy = {highWaterMark: 3}; // waiting length for controller.enqueue
const rstream = new ReadableStream(readableSource(data), strategy);
const reader = rstream.getReader();
try {
for (let {value, done} = await reader.read(); !done;
{value, done} = await reader.read()) {
console.log(value);
await new Promise(f => setTimeout(f, 100));
}
} finally {
reader.releaseLock();
}
};
await queuingReader().catch(console.error);
@bellbind
Copy link
Author

bellbind commented Jun 12, 2020

run with deno as:

$ deno run https://gist.githack.com/bellbind/7c9954a5e790f802259b0f12e8a0f976/raw/web-streams-examples.js
[raw reader] Uint8Array(11) [
  72, 101, 108, 108,
  111, 32, 87, 111,
  114, 108, 100
]
[wrapped reader] Uint8Array(11) [
  72, 101, 108, 108,
  111, 32, 87, 111,
  114, 108, 100
]
Hello
World
1
write 1
write 2
write 3
2
write 4
3
write 5
4
write 6
5
write 7
6
write 8
7
8
pull 1
1
pull 2
pull 3
pull 4
pull 5
2
pull 6
3
pull 7
4
pull 8
5
6
7
8
$

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment