Skip to content

Instantly share code, notes, and snippets.

@miyaokamarina
Last active August 16, 2019 07:41
Show Gist options
  • Save miyaokamarina/52e75c105a83b8104634b02ffe44609c to your computer and use it in GitHub Desktop.
Save miyaokamarina/52e75c105a83b8104634b02ffe44609c to your computer and use it in GitHub Desktop.
Asynchronous generators multiplexer for TypeScript/JavaScript

Asynchronous generators mutiplexer

This function allows you to multiplex multiple asynchronous generators iterables into one.

Items from all iterables will be streamed to resulting one as soon as they come from source iterable.

And last but not least, you can add new iterables to multiplexer after it was created!

Example:

const gen = <a>(as: readonly a[], name: string, pause: number) => async function* () {
    await wait(pause);

    for (const a of as) {
        yield `${name}: ${a}`;

        await wait(100);
    }
};

const foo = gen(['1', '2', '3', '4', '5', '6'], 'foo', 100);
const bar = gen(['a', 'b', 'c', 'd', 'e', 'f'], 'bar', 50);
const baz = gen(['Q', 'W', 'E', 'R', 'T', 'Y'], 'baz', 0);

const multiplexed = agm(foo(), bar());

(async () => {
    for await (const x of multiplexed) {
        console.log(x);
    }
})();

multiplexed(baz());

// Output:
//
// 'baz: Q'
// 'bar: a'
// 'foo: 1'
// 'baz: W'
// 'bar: b'
// 'foo: 2'
// 'baz: E'
// 'bar: c'
// 'foo: 3'
// 'baz: R'
// 'bar: d'
// 'foo: 4'
// 'baz: T'
// 'bar: e'
// 'foo: 5'
// 'baz: Y'
// 'bar: f'
// 'foo: 6'

License

MIT

/**
* @author Marina Miyaoka <miyaokamarina@gmail.com> (https://twitter.com/miyaokamarina)
* @license MIT
*/
export interface Agm<a> extends AsyncIterableIterator<a> {
(a: AsyncIterable<a>): Agm<a>;
}
const handle = <a>(a: AsyncIterator<a>, emit: (r: IteratorResult<a>) => void) => {
a.next()
.then(emit)
.catch(() => void 0);
};
const register = <a>(listeners: ReadonlySet<(r: IteratorResult<a>) => unknown>) => (a: AsyncIterable<a>) => {
const i = a[Symbol.asyncIterator]();
const emit = (r: IteratorResult<a>) => {
if (!r.done) {
handle(i, emit);
listeners.forEach(f => f(r));
}
};
handle(i, emit);
};
export function agm<a>(...as: readonly AsyncIterable<a>[]): Agm<a> {
const listeners = new Set<(r: IteratorResult<a>) => unknown>();
const once = (f: (r: IteratorResult<a>) => unknown) => listeners.add(f);
as.forEach(register(listeners));
const agm = (a: AsyncIterable<a>) => {
register(listeners)(a);
return agm;
};
// tslint:disable-next-line promise-function-async
agm.next = () => {
return new Promise<IteratorResult<a>>(resolve => {
const listener = (r: IteratorResult<a>) => {
listeners.delete(listener);
resolve(r);
};
once(listener);
});
};
(agm as Agm<a>)[Symbol.asyncIterator] = () => agm as Agm<a>;
return agm as Agm<a>;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment