Skip to content

Instantly share code, notes, and snippets.

@frangio
Created November 9, 2022 00:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frangio/ad10e1c7389c4149c7b1c1186c135f98 to your computer and use it in GitHub Desktop.
Save frangio/ad10e1c7389c4149c7b1c1186c135f98 to your computer and use it in GitHub Desktop.
export async function* chain<T, U>(g: AsyncGenerator<T, void>, f: (x: T) => AsyncGenerator<U, void>): AsyncGenerator<U, void> {
type R<I> = IteratorResult<I extends number ? U : T, void>;
type IR<I> = [I, R<I>];
const k = <I>(i: I) => (res: R<I>): IR<I> => [i, res];
const running = new Set<AsyncGenerator<unknown, void>>([g]);
let p = g.next().then(k(null));
const ps: Promise<IR<number>>[] = [];
const gs: AsyncGenerator<U, void>[] = [];
const queue = new Set<Promise<IR<null> | IR<number>>>([p]);
try {
while (queue.size > 0) {
const [i, res] = await Promise.race(queue);
queue.delete(i === null ? p : ps[i]!);
if (res.done) {
running.delete(i === null ? g : gs[i]!);
} else if (i === null) {
const j = gs.length;
const gsj = f(res.value);
gs.push(gsj);
running.add(gsj);
const psj = gsj.next().then(k(j));
ps.push(psj);
queue.add(psj);
p = g.next().then(k(null));
queue.add(p);
} else {
const psi = ps[i] = gs[i]!.next().then(k(i));
queue.add(psi);
yield res.value;
}
}
} finally {
await Promise.all(Array.from(running, g => g.return()));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment