Skip to content

Instantly share code, notes, and snippets.

@runjak
Last active May 19, 2023 18:00
Show Gist options
  • Save runjak/13fa515828846eb2a0e4efae60c91e8b to your computer and use it in GitHub Desktop.
Save runjak/13fa515828846eb2a0e4efae60c91e8b to your computer and use it in GitHub Desktop.
An experiment towards async push/pull streams
type AsyncCell<T> = { value: T; next: Promise<AsyncCell<T>> };
type AsyncStreamOutput<T> = () => AsyncGenerator<unknown, T, unknown>;
type AsyncStreamInput<T> = (value: T) => unknown;
const mkPromiseAndInput = <T>(): [Promise<T>, AsyncStreamInput<T>] => {
// streamInput is assigned before return because of Promise implementation
let streamInput!: AsyncStreamInput<T>;
const promise = new Promise<T>((resolve) => {
streamInput = resolve as AsyncStreamInput<T>;
});
return [promise, streamInput];
};
const mkAsyncStream = <T>(): [AsyncStreamOutput<T>, AsyncStreamInput<T>] => {
let [currentCellPromise, resolveCurrentCell] = mkPromiseAndInput<
AsyncCell<T>
>();
const streamInput: AsyncStreamInput<T> = (value: T) => {
const resolvePreviousCell = resolveCurrentCell;
[currentCellPromise, resolveCurrentCell] = mkPromiseAndInput<
AsyncCell<T>
>();
resolvePreviousCell({ value, next: currentCellPromise });
};
const streamOutput = async function* () {
let currentCell = await currentCellPromise;
while (true) {
yield currentCell.value;
currentCell = await currentCell.next;
}
};
return [streamOutput, streamInput];
};
@runjak
Copy link
Author

runjak commented Apr 7, 2022

Constraints:

  1. Have an input where some events can be pushed into at any time in the future.
  2. Support streaming such events using and async generator.
  3. Support multiple async generator streams at a time. Support the amount of streams to be changing.
  4. Don't accumulate events in storage if no generator is waiting for them.

@runjak
Copy link
Author

runjak commented May 19, 2023

Wrote a bit more text on it here:
https://www.runjak.codes/posts/2022-08-06-async-stream

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment