Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import { identity } from '../internal/identity';
// tslint:disable-next-line:no-empty
const NEVER_PROMISE = new Promise(() => {});
type MergeResult<T> = { value: T; index: number };
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>;
}
export async function forkJoin<T>(...args: AsyncIterable<T>[]): Promise<T[] | undefined> {
const length = args.length;
const iterators = new Array<AsyncIterator<T>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);
let active = length;
const values = new Array<T>(length);
const hasValues = new Array<boolean>(length);
hasValues.fill(false);
for (let i = 0; i < length; i++) {
const iterator = args[i][Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}
while (active > 0) {
const next = Promise.race(nexts);
const { value: next$, index } = await next;
if (next$.done) {
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE;
active--;
} else {
const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
hasValues[index] = true;
values[index] = next$.value;
}
}
if (hasValues.every(identity)) {
return values;
}
return undefined;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.