Skip to content

Instantly share code, notes, and snippets.

@domenic
Last active August 29, 2015 14:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save domenic/e251e37a300e51c5321f to your computer and use it in GitHub Desktop.
Save domenic/e251e37a300e51c5321f to your computer and use it in GitHub Desktop.
Readable byte stream use cases comparison

Inspired by whatwg/streams#253 (comment), but way updated

Use cases:

  • (all) Want to read a 10 MiB file into a single ArrayBuffer
  • (chunkwise) Want to read a 10 MiB file, 1 MiB at a time, re-using a single 1 MiB ArrayBuffer, and calling processChunk on each chunk before reading the next one
  • (ping-pong) Want to read a 10 MiB file, 1 MiB at a time, re-using two separate 1 MiB ArrayBuffers, because you are asynchronously processing (processChunk) the 1 MiB chunks and want to get a potential 1 MiB headstart in parallel with the async processing

Note: we're using ES2016 async/await syntax so that we don't have to do recursion with promises but instead can just use loops. This shouldn't really bias the comparison as all of these designs involve the same amount of promises.

Summary:

  • async-readinto-no-transfer.js was my initial idea, which causes observable data races and so is a no-go.
  • setallocator.js was based on a design of @wanderview, but ends up being awkward due to the pause/resume semantics that need to be added. It also has observable data races as written; I haven't tried yet writing an updated version without them.
  • feed.js was based on an idea of @tyoshino.
  • wait(ab).js is a simplification of feed.js and is where I've landed for now.

Also of note: there is an idea I didn't write out here, which is an async read(sourceAB, offset, bytesDesired) -> Promise<{ result, bytesRead }> where result is a transfer of sourceAB. It is a bit more powerful in that it allows multiple reads into the same pre-allocated backing memory (albeit at the cost of a temporary variable that ends up with a detached array buffer for each read). However it is so awkward I don't think it's worth really considering.

// NB: this approach is a no-go since it creates observable data races (array buffers being filled in by another thread)
// Common prelude
const ONE_MIB = 1024 * 1024;
const TEN_MIB = 10 * ONE_MIB;
async function all(rbs) {
const ab = new ArrayBuffer(TEN_MIB);
await rbs.readInto(ab, 0, ab.byteLength);
return ab;
}
async function chunkwise(rbs, processChunk) {
const ab = new ArrayBuffer(ONE_MIB);
for (const i = 0; i < 10; ++i) {
await rbs.readInto(ab, 0, ONE_MIB);
await processChunk(ab);
}
}
async function pingPong(rbs, processChunk) {
const abs = [new ArrayBuffer(ONE_MIB), new ArrayBuffer(ONE_MIB)];
let processingPromise;
for (const i = 0; i < 10; ++i) {
const dest = abs[i % 2];
const readPromise = rbs.readInto(dest, i * ONE_MIB, ONE_MIB);
await Promise.all([readPromise, processingPromise]);
processingPromise = processChunk(dest);
}
// i = 0:
// Read chunk 0 into abs[0]
// Await reading of chunk 0
// Process chunk 0
// i = 1:
// Read chunk 1 into abs[1]
// Await (reading of chunk 1, processing of chunk 0)
// Process chunk 1
// i = 2:
// Read chunk 2 into abs[0]
// Await (reading of chunk 2, processing of chunk 1)
// Process chunk 2
}
// Common prelude
const ONE_MIB = 1024 * 1024;
const TEN_MIB = 10 * ONE_MIB;
async function all(rbs) {
const ab = new ArrayBuffer(TEN_MIB);
// Detaches ab, transfering it to newAB
// Begins the fread(newAB, 0, newAB.byteLength /* === TEN_MIB */) call
rbs.feed(ab);
await rbs.ready; // fulfills when the fread() call finishes
return rbs.read(); // return value will be using the same backing memory as ab, but not ===
}
async function chunkwise(rbs, processChunk) {
const ab = new ArrayBuffer(ONE_MIB);
// Detaches ab, transfering it to newAB
// Begins the fread(newAB, 0, newAB.byteLength /* === ONE_MIB */) call
rbs.feed(ab);
for (const i = 0; i < 10; ++i) {
await rbs.ready; // fulfills when the first fread(newAB, 0, ONE_MIB) call finishes
const result = rbs.read(); // return value is newAB, i.e. not === ab
// Right now nothing is happening since even though the stream is empty, there is no buffer it can use to read.
await processChunk(result);
// Now we can feed result back to the stream.
// This detaches result, transfering it to newAB2, and begins the fread(newAB2, (i + 1) * ONE_MIB, ONE_MIB)
rbs.feed(result);
}
}
async function pingPong(rbs, processChunk) {
const pool = [new ArrayBuffer(ONE_MIB), new ArrayBuffer(ONE_MIB)];
rbs.feed(pool[0]);
await rbs.ready;
for (const i = 0; i < 10; ++i) {
await Promise.all([
process(i % 2),
read((i + 1) % 2)
]);
}
function process(i) {
const chunk = rbs.read();
await processChunk(chunk);
pool[i] = chunk;
}
function read(i) {
rbs.feed(pool[i]);
await rbs.ready;
}
// Before the loop: feed causes read chunk 0 into pool[0]
// await rbs.ready to know when that's finished
// i = 0: in parallel,
// process(0)
// - Read and process chunk 0
// - when done, return the backing memory to the pool as pool[0]
// read(1)
// - Feed pool[1], causing read of chunk 1 into it
// - await rbs.ready to know when it's finished
// i = 1: in parallel,
// process(1)
// - Read and process chunk 1
// - when done, return the backing memory to the pool as pool[1]
// read(2)
// - Feed pool[0], causing read of chunk 2 into it
// - await rbs.ready to know when it's finished
}
// Common prelude
const ONE_MIB = 1024 * 1024;
const TEN_MIB = 10 * ONE_MIB;
async function all(rbs) {
const ab = new ArrayBuffer(TEN_MIB);
// Begins the fread(ab, 0, ab.byteLength /* === TEN_MIB */) call
rbs.setAllocator(() => ab);
await rbs.ready; // fulfills when the fread() call finishes
return rbs.read(); // return value will be === to ab
}
async function chunkwise(rbs, processChunk) {
const ab = new ArrayBuffer(ONE_MIB);
// Begins the fread(ab, 0, ab.byteLength /* === ONE_MIB */) call
rbs.setAllocator(() => ab);
for (const i = 0; i < 10; ++i) {
await rbs.ready; // fulfills when the first fread(ab, 0, ONE_MIB) call finishes
rbs.read(); // return value is === ab again
// The call to rbs.read() emptied the queue, automatically kicking off
// fread(ab, (i + 1) * ONE_MIB, ONE_MIB)
// This is bad! Our `ab` is about to be overwritten!
// ... So we need some way to suppress further reads?? Either that or make a copy.
rbs.pause(); // ???
await processChunk(ab);
rbs.resume(); // no fun
}
}
async function pingPong(rbs, processChunk) {
const abs = [new ArrayBuffer(ONE_MIB), new ArrayBuffer(ONE_MIB)];
let counter = 0;
// Begins the fread(abs[0], 0, abs[0].byteLength /* === ONE_MIB */) call
rbs.setAllocator(() => abs[counter++ % 2]);
await rbs.ready;
for (const i = 0; i < 10; ++i) {
await Promise.all([
rbs.ready.then(() => rbs.pause()),
processChunk(rbs.read())
])
.then(() => rbs.resume());
}
// Before the loop: setAllocator causes read chunk 0 into abs[0]
// await rbs.ready to know when that's finished
// At that point reading chunk 1 into abs[1] starts
// i = 0:
// Process chunk 0
// Await (reading of chunk 1, processing of chunk 0)
// However if reading of chunk 1 finishes before processing of chunk 0,
// we're in trouble, since it will automatically trigger reading abs[0],
// which is still being used by the chunk processor. So, we have to
// immediately pause the flow once reading chunk 1 finishes, then resume
// after both operations finish.
}
// Common prelude
const ONE_MIB = 1024 * 1024;
const TEN_MIB = 10 * ONE_MIB;
async function all(rbs) {
const ab = new ArrayBuffer(TEN_MIB);
// Detaches ab, transfering it to newAB
// Begins the fread(newAB, 0, newAB.byteLength /* === TEN_MIB */) call
await rbs.wait(ab);
return rbs.read(); // return value will be using the same backing memory as ab, but not ===
}
async function chunkwise(rbs, processChunk) {
let current = new ArrayBuffer(ONE_MIB);
for (const i = 0; i < 10; ++i) {
// Detaches current, transfering it to newAB
// Begins the fread(newAB, 0, newAB.byteLength /* === ONE_MIB */) call
await rbs.wait(current); // fulfills when the first fread(newAB, 0, ONE_MIB) call finishes
current = rbs.read(); // return value is newAB, i.e. not === ab
// Right now nothing is happening since even though the stream is empty, there is no buffer it can use to read.
await processChunk(current);
}
}
async function pingPong(rbs, processChunk) {
const pool = [new ArrayBuffer(ONE_MIB), new ArrayBuffer(ONE_MIB)];
await rbs.wait(pool[0]);
for (const i = 0; i < 10; ++i) {
await Promise.all([
process(i % 2),
await rbs.wait(pool[(i + 1) % 2])
]);
}
function process(i) {
const chunk = rbs.read();
await processChunk(chunk);
pool[i] = chunk;
}
// Before the loop: feed causes read chunk 0 into pool[0]
// await rbs.ready to know when that's finished
// i = 0: in parallel,
// process(0)
// - Read and process chunk 0
// - when done, return the backing memory to the pool as pool[0]
// read(1)
// - Feed pool[1], causing read of chunk 1 into it
// - await rbs.ready to know when it's finished
// i = 1: in parallel,
// process(1)
// - Read and process chunk 1
// - when done, return the backing memory to the pool as pool[1]
// read(2)
// - Feed pool[0], causing read of chunk 2 into it
// - await rbs.ready to know when it's finished
}
@tyoshino
Copy link

The semantics of readInto in this example is that it fulfills the returned promise only when the specified region is fully filled? Or details are omitted for simplicity of examples?

In chunkwise, readInto() is invoked with (ab, 1MiB, 1MiB), (ab, 2MiB, 1MiB), ... But ab is ONE_MIB long. Some typo?

@domenic
Copy link
Author

domenic commented Feb 25, 2015

@tyoshino sorry, there are no notifications for gist comments so I didn't see this.

The semantics of readInto in this example is that it fulfills the returned promise only when the specified region is fully filled? Or details are omitted for simplicity of examples?

My thought was that yes, we would wait. But maybe that is not so good for epoll/select streams.

In chunkwise, readInto() is invoked with (ab, 1MiB, 1MiB), (ab, 2MiB, 1MiB), ... But ab is ONE_MIB long. Some typo?

Yes, fixing.

@tyoshino
Copy link

tyoshino commented Mar 5, 2015

Oh, I didn't know that. I'll make comments on the issue pointing to the gist from next time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment