Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created April 2, 2020 23:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/91536cbaf787361e3bda9bd10a01ec76 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/91536cbaf787361e3bda9bd10a01ec76 to your computer and use it in GitHub Desktop.
import { identity, identityAsync } from '../util/identity';
import { wrapWithAbort } from './operators/withabort';
// eslint-disable-next-line @typescript-eslint/no-empty-function
const NEVER_PROMISE = new Promise(() => {});
type MergeResult<T> = { value: T; index: number };
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>;
}
export interface ForkJoinOptions<T, R> {
thisArg?: any;
selector?: (args: T[], signal?: AbortSignal) => R | Promise<R>;
signal?: AbortSignal;
}
// eslint-disable-next-line complexity
export async function forkJoin<T, R>(
sources: AsyncIterable<T>[],
options: ForkJoinOptions<T, R>
): Promise<R | undefined> {
const opts = options || { selector: identityAsync } as ForkJoinOptions<T, R>;
const { selector, thisArg, signal } = opts;
const length = sources.length;
const iterators = new Array<AsyncIterator<T>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);
let active = length;
const values = new Array<T>(length);
const hasValues = new Array<boolean>(length);
hasValues.fill(false);
for (let i = 0; i < length; i++) {
const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}
while (active > 0) {
const next = Promise.race(nexts);
const { value: next$, index } = await next;
if (next$.done) {
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE;
active--;
} else {
const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
hasValues[index] = true;
values[index] = next$.value;
}
}
if (hasValues.length > 0 && hasValues.every(identity)) {
return await selector!.call(thisArg, values, signal);
}
return undefined;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment