Last active
November 3, 2017 00:14
-
-
Save mattpodwysocki/14a0d9ebc9f236948e42a46fd45ddb67 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { identity, identityAsync } from '../internal/identity'; | |
// tslint:disable-next-line:no-empty | |
const NEVER_PROMISE = new Promise(() => {}); | |
type MergeResult<T> = { value: T; index: number }; | |
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { | |
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>; | |
} | |
export function forkJoin<T, T2>( | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2> | |
): Promise<[T, T2] | undefined>; | |
export function forkJoin<T, T2, T3>( | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3> | |
): Promise<[T, T2, T3] | undefined>; | |
export function forkJoin<T, T2, T3, T4>( | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4> | |
): Promise<[T, T2, T3, T4] | undefined>; | |
export function forkJoin<T, T2, T3, T4, T5>( | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4>, | |
source5: AsyncIterable<T5> | |
): Promise<[T, T2, T3, T4, T5] | undefined>; | |
export function forkJoin<T, T2, T3, T4, T5, T6>( | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4>, | |
source5: AsyncIterable<T5>, | |
source6: AsyncIterable<T6> | |
): Promise<[T, T2, T3, T4, T5, T6] | undefined>; | |
export function forkJoin<T, R>( | |
project: (values: [T]) => R | Promise<R>, | |
source: AsyncIterable<T> | |
): Promise<R | undefined>; | |
export function forkJoin<T, T2, R>( | |
project: (values: [T, T2]) => R | Promise<R>, | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2> | |
): Promise<R | undefined>; | |
export function forkJoin<T, T2, T3, R>( | |
project: (values: [T, T2, T3]) => R | Promise<R>, | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3> | |
): Promise<R | undefined>; | |
export function forkJoin<T, T2, T3, T4, R>( | |
project: (values: [T, T2, T3, T4]) => R | Promise<R>, | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4> | |
): Promise<R | undefined>; | |
export function forkJoin<T, T2, T3, T4, T5, R>( | |
project: (values: [T, T2, T3, T4, T5]) => R | Promise<R>, | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4>, | |
source5: AsyncIterable<T5> | |
): Promise<R | undefined>; | |
export function forkJoin<T, T2, T3, T4, T5, T6, R>( | |
project: (values: [T, T2, T3, T4, T5, T6]) => R | Promise<R>, | |
source: AsyncIterable<T>, | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4>, | |
source5: AsyncIterable<T5>, | |
source6: AsyncIterable<T6> | |
): Promise<R | undefined>; | |
export function forkJoin<T>(...sources: AsyncIterable<T>[]): Promise<T[] | undefined>; | |
export function forkJoin<T, R>( | |
project: (values: T[]) => R | Promise<R>, | |
...sources: AsyncIterable<T>[] | |
): Promise<R | undefined>; | |
export async function forkJoin<T, R>(...sources: any[]): Promise<R | undefined> { | |
let fn = sources.shift() as (values: any[]) => R | Promise<R>; | |
if (typeof fn !== 'function') { | |
sources.push(fn); | |
fn = identityAsync; | |
} | |
const length = sources.length; | |
const iterators = new Array<AsyncIterator<T>>(length); | |
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length); | |
let active = length; | |
const values = new Array<T>(length); | |
const hasValues = new Array<boolean>(length); | |
hasValues.fill(false); | |
for (let i = 0; i < length; i++) { | |
const iterator = sources[i][Symbol.asyncIterator](); | |
iterators[i] = iterator; | |
nexts[i] = wrapPromiseWithIndex(iterator.next(), i); | |
} | |
while (active > 0) { | |
const next = Promise.race(nexts); | |
const { value: next$, index } = await next; | |
if (next$.done) { | |
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE; | |
active--; | |
} else { | |
console.log(`${index} - ${next$.value}`); | |
const iterator$ = iterators[index]; | |
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); | |
hasValues[index] = true; | |
values[index] = next$.value; | |
} | |
} | |
if (hasValues.every(identity)) { | |
return await fn(values); | |
} | |
return undefined; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var forkJoin = require('./targets/es2015/cjs/asynciterable/forkjoin').forkJoin; | |
function delayValue(value, time) { | |
return new Promise(res => setTimeout(() => res(value), time)); | |
} | |
async function* gen1() { | |
yield await delayValue(1, 250); | |
yield await delayValue(2, 250); | |
yield await delayValue(3, 250); | |
} | |
async function* gen2() { | |
yield await delayValue(2, 300); | |
yield await delayValue(3, 300); | |
yield await delayValue(1, 300); | |
} | |
async function* gen3() { | |
yield await delayValue(3, 300); | |
yield await delayValue(1, 300); | |
yield await delayValue(2, 300); | |
} | |
var result = forkJoin(gen1(), gen2(), gen3()); | |
result.then(console.log); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
➜ IxJS git:(master) ✗ node --harmony-async-iteration foo.js | |
2 - 1 | |
0 - 2 | |
1 - 3 | |
2 - 2 | |
0 - 3 | |
1 - 1 | |
2 - 3 | |
0 - 1 | |
1 - 2 | |
[ 1, 2, 3 ] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment