Skip to content

Instantly share code, notes, and snippets.

@MattiasBuelens
Last active November 30, 2023 02:14
Show Gist options
  • Save MattiasBuelens/e2f94a5819f04ec7d5d9a9d71739044f to your computer and use it in GitHub Desktop.
Save MattiasBuelens/e2f94a5819f04ec7d5d9a9d71739044f to your computer and use it in GitHub Desktop.
Teeing an async iterator
/**
* Like ReadableStream.tee(), but for any async iterable.
* https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/tee
*
* See: MattiasBuelens/web-streams-polyfill#80
*/
/// <reference lib="esnext" />
interface QueueNode<T> {
value: T;
next: QueueNode<T> | undefined;
}
function teeAsyncIterator<T>(iterable: AsyncIterable<T>): [AsyncIterable<T>, AsyncIterable<T>] {
const source = iterable[Symbol.asyncIterator]();
// A linked list of enqueued chunks. (The first node has no value.)
let queue: QueueNode<T> = {
value: undefined!,
next: undefined
};
// Which branches have already been closed.
let closed: [boolean, boolean] = [false, false];
// Whether we're currently reading from the source.
let reading = false;
// Whether the source stream has closed.
let done = false;
// A promise for the current read (if reading is true).
let currentRead: Promise<void> | undefined;
async function next(): Promise<void> {
reading = true;
const result = await source.next();
if (result.done) {
done = true;
} else {
const nextNode: QueueNode<T> = {
value: result.value,
next: undefined
}
queue.next = nextNode;
queue = nextNode;
}
reading = false;
}
async function* branch(i: 0 | 1, buffer: QueueNode<T>): AsyncGenerator<T, undefined, undefined> {
try {
while (true) {
if (buffer.next) {
buffer = buffer.next;
yield buffer.value;
} else if (done) {
return;
} else {
if (!reading) {
currentRead = next();
}
await currentRead;
}
}
} finally {
closed[i] = true;
// Close source iterator if both branches are closed
// Important: don't call return() if next() returned {done: true}!
if (!done && closed[1 - i]) {
await source.return?.();
}
}
}
return [
branch(0, queue),
branch(1, queue)
];
}
@jimmywarting
Copy link

jimmywarting commented Jun 30, 2021

Sweet, not the kind of buffer branch i was hoping for. Where hoping for something in lines of what yocto-queue did with the the Node#head

...Like we discussed in the issue, i have some concurrences about GC when one of the branch are forgotten and never used anymore. But i would totally use this anyway.
Looks very much like the sync iterator on SO with a few minor changes to make it async

If you come up with a object based solution (like the yocto queue) instead of using:

const buffers: Pair<T[] | undefined> = [[], []];

... or if you add a WeakRef too it and post it on that stackoverflow question as a answer then i will totally give you a bounty reward 4 it
(Object mode instead of buffers, slice, push === higher reward)

@MattiasBuelens
Copy link
Author

MattiasBuelens commented Jun 30, 2021

All right, I gave WeakRefs a try. It's far from perfect though: I only check whether the branches have been garbage collected after a successful next() call, or after one of the branches is closed with return(). There are probably some cases that I'm still missing... 😛

I don't even know if it's a good idea to rely on these iterators actually getting garbage collected in order to close the source iterator... As MDN explains, they might live longer than expected, or possibly never get collected at all. 🤷‍♂️

@MattiasBuelens
Copy link
Author

MattiasBuelens commented Jun 30, 2021

There you go, now with a linked list. 😁

This version doesn't try to close the source iterator if both branches are garbage collected. But at least all buffered chunks will have been garbage collected as well.

I don't know which version has the best performance. My intuition says "linked lists are bad" and create more garbage than a regular linear array. But I have to admit, linked lists do make it easier to automatically throw away the unused buffered chunks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment