Created
April 2, 2020 23:15
-
-
Save mattpodwysocki/91536cbaf787361e3bda9bd10a01ec76 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { identity, identityAsync } from '../util/identity'; | |
import { wrapWithAbort } from './operators/withabort'; | |
// eslint-disable-next-line @typescript-eslint/no-empty-function | |
const NEVER_PROMISE = new Promise(() => {}); | |
type MergeResult<T> = { value: T; index: number }; | |
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { | |
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>; | |
} | |
export interface ForkJoinOptions<T, R> { | |
thisArg?: any; | |
selector?: (args: T[], signal?: AbortSignal) => R | Promise<R>; | |
signal?: AbortSignal; | |
} | |
// eslint-disable-next-line complexity | |
export async function forkJoin<T, R>( | |
sources: AsyncIterable<T>[], | |
options: ForkJoinOptions<T, R> | |
): Promise<R | undefined> { | |
const opts = options || { selector: identityAsync } as ForkJoinOptions<T, R>; | |
const { selector, thisArg, signal } = opts; | |
const length = sources.length; | |
const iterators = new Array<AsyncIterator<T>>(length); | |
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length); | |
let active = length; | |
const values = new Array<T>(length); | |
const hasValues = new Array<boolean>(length); | |
hasValues.fill(false); | |
for (let i = 0; i < length; i++) { | |
const iterator = wrapWithAbort(sources[i], signal)[Symbol.asyncIterator](); | |
iterators[i] = iterator; | |
nexts[i] = wrapPromiseWithIndex(iterator.next(), i); | |
} | |
while (active > 0) { | |
const next = Promise.race(nexts); | |
const { value: next$, index } = await next; | |
if (next$.done) { | |
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE; | |
active--; | |
} else { | |
const iterator$ = iterators[index]; | |
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); | |
hasValues[index] = true; | |
values[index] = next$.value; | |
} | |
} | |
if (hasValues.length > 0 && hasValues.every(identity)) { | |
return await selector!.call(thisArg, values, signal); | |
} | |
return undefined; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment