Skip to content

Instantly share code, notes, and snippets.

@runjak
Last active May 19, 2023 18:00
Show Gist options
  • Save runjak/13fa515828846eb2a0e4efae60c91e8b to your computer and use it in GitHub Desktop.
Save runjak/13fa515828846eb2a0e4efae60c91e8b to your computer and use it in GitHub Desktop.
An experiment towards async push/pull streams
type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
type AsyncStreamOutput<T> = () => AsyncGenerator<unknown, T, unknown>;
type AsyncStreamInput<T> = (value: T) => unknown;
const mkPromiseAndInput = <T>(): [Promise<T>, AsyncStreamInput<T>] => {
// streamInput is assigned before return because of Promise implementation
let streamInput!: AsyncStreamInput<T>;
const promise = new Promise<T>((resolve) => {
streamInput = resolve as AsyncStreamInput<T>;
});
return [promise, streamInput];
};
const mkAsyncStream = <T>(): [AsyncStreamOutput<T>, AsyncStreamInput<T>] => {
let [currentCellPromise, resolveCurrentCell] = mkPromiseAndInput<
AsyncCell<T>
>();
const streamInput: AsyncStreamInput<T> = (value: T) => {
const resolvePreviousCell = resolveCurrentCell;
[currentCellPromise, resolveCurrentCell] = mkPromiseAndInput<
AsyncCell<T>
>();
resolvePreviousCell({ value, next: currentCellPromise });
};
const streamOutput = async function* () {
let currentCell = await currentCellPromise;
while (true) {
yield currentCell.value;
currentCell = await currentCell.next;
}
};
return [streamOutput, streamInput];
};
@runjak
Copy link
Author

runjak commented May 19, 2023

Wrote a bit more text on it here:
https://www.runjak.codes/posts/2022-08-06-async-stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment