Skip to content

Instantly share code, notes, and snippets.

@safareli
Last active June 9, 2021 17:03
Show Gist options
  • Save safareli/a236af3ea4efb9ebf6ef7070f96b74b6 to your computer and use it in GitHub Desktop.
Save safareli/a236af3ea4efb9ebf6ef7070f96b74b6 to your computer and use it in GitHub Desktop.
Transformer by async generator (this might not work)
import { Duplex, ReadableOptions } from "stream";
export const createDeferredPromise = <T>() => {
const res = {
resolve: (_val: T) => {},
reject: (_reason?: unknown) => {},
promise: new Promise<T>((resolve, reject) => {
res.resolve = resolve;
res.reject = reject;
}),
};
return res;
};
type InputGen = AsyncGenerator<{ chunk: unknown; encoding: unknown }, void>;
export function transformBy(
asyncGeneratorFn: (input: InputGen) => AsyncGenerator<unknown, void>,
options: ReadableOptions
) {
let { promise, resolve } = createDeferredPromise<
| { done: true; cb: () => void }
| { done: false; chunk: unknown; encoding: string; cb: () => void }
>();
const asyncGenerator: InputGen = (async function* () {
while (true) {
const res = await promise;
process.nextTick(res.cb);
if (res.done) {
return;
}
yield { chunk: res.chunk, encoding: res.encoding };
({ promise, resolve } = createDeferredPromise());
}
})();
const write = (chunk: unknown, encoding: string, cb: () => void) => {
resolve({ chunk, done: false, encoding, cb });
};
const final = (cb: () => void) => {
resolve({ done: true, cb });
};
const optionsDuplex = {
objectMode: true,
autoDestroy: true,
...options,
};
Object.assign(optionsDuplex, final, write);
return Duplex.from(asyncGeneratorFn(asyncGenerator), optionsDuplex);
}
@safareli
Copy link
Author

safareli commented Jun 9, 2021

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment