Created
November 1, 2017 03:08
-
-
Save mattpodwysocki/115286e47e795e7715566e6d20c4c77e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { identity } from '../internal/identity'; | |
// tslint:disable-next-line:no-empty | |
const NEVER_PROMISE = new Promise(() => {}); | |
type MergeResult<T> = { value: T; index: number }; | |
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { | |
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>; | |
} | |
export async function forkJoin<T>(...args: AsyncIterable<T>[]): Promise<T[] | undefined> { | |
const length = args.length; | |
const iterators = new Array<AsyncIterator<T>>(length); | |
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length); | |
let active = length; | |
const values = new Array<T>(length); | |
const hasValues = new Array<boolean>(length); | |
hasValues.fill(false); | |
for (let i = 0; i < length; i++) { | |
const iterator = args[i][Symbol.asyncIterator](); | |
iterators[i] = iterator; | |
nexts[i] = wrapPromiseWithIndex(iterator.next(), i); | |
} | |
while (active > 0) { | |
const next = Promise.race(nexts); | |
const { value: next$, index } = await next; | |
if (next$.done) { | |
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE; | |
active--; | |
} else { | |
const iterator$ = iterators[index]; | |
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); | |
hasValues[index] = true; | |
values[index] = next$.value; | |
} | |
} | |
if (hasValues.every(identity)) { | |
return values; | |
} | |
return undefined; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment