Last active
November 1, 2017 02:32
-
-
Save mattpodwysocki/678cac43245767e09e2ff976a245e745 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from './asynciterablex'; | |
import { identity } from '../internal/identity'; | |
// tslint:disable-next-line:no-empty | |
const NEVER_PROMISE = new Promise(() => {}); | |
type MergeResult<T> = { value: T; index: number }; | |
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { | |
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>; | |
} | |
export class CombineLatestAsyncIterable<T> extends AsyncIterableX<T[]> { | |
private _source: AsyncIterable<T>[]; | |
constructor(sources: AsyncIterable<T>[]) { | |
super(); | |
this._source = sources; | |
} | |
async *[Symbol.asyncIterator]() { | |
const length = this._source.length; | |
const iterators = new Array<AsyncIterator<T>>(length); | |
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length); | |
let hasValueAll = false; | |
const values = new Array<T>(length); | |
const hasValues = new Array<boolean>(length); | |
let active = length; | |
hasValues.fill(false); | |
for (let i = 0; i < length; i++) { | |
const iterator = this._source[i][Symbol.asyncIterator](); | |
iterators[i] = iterator; | |
nexts[i] = wrapPromiseWithIndex(iterator.next(), i); | |
} | |
while (active > 0) { | |
const next = Promise.race(nexts); | |
const { value: next$, index } = await next; | |
if (next$.done) { | |
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE; | |
active--; | |
} else { | |
values[index] = next$.value; | |
hasValues[index] = true; | |
const iterator$ = iterators[index]; | |
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); | |
if (hasValueAll || (hasValueAll = hasValues.every(identity))) { | |
yield values; | |
} | |
} | |
} | |
} | |
} | |
export function combineLatest<T, T2>( | |
source: AsyncIterable<T>, | |
v2: AsyncIterable<T2> | |
): AsyncIterableX<(T | T2)[]>; | |
export function combineLatest<T, T2, T3>( | |
source: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3> | |
): AsyncIterableX<(T | T2 | T3)[]>; | |
export function combineLatest<T, T2, T3, T4>( | |
source: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3>, | |
v4: AsyncIterable<T4> | |
): AsyncIterableX<(T | T2 | T3 | T4)[]>; | |
export function combineLatest<T, T2, T3, T4, T5>( | |
source: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3>, | |
v4: AsyncIterable<T4>, | |
v5: AsyncIterable<T5> | |
): AsyncIterable<(T | T2 | T3 | T4 | T5)[]>; | |
export function combineLatest<T, T2, T3, T4, T5, T6>( | |
source: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3>, | |
v4: AsyncIterable<T4>, | |
v5: AsyncIterable<T5>, | |
v6: AsyncIterable<T6> | |
): AsyncIterable<(T | T2 | T3 | T4 | T5 | T6)[]>; | |
export function combineLatest<T>( | |
source: AsyncIterable<T>, | |
...args: AsyncIterable<T>[]): AsyncIterableX<T[]> { | |
return new CombineLatestAsyncIterable<T>([source, ...args]); | |
} | |
export function combineLatestStatic<T, T2>( | |
v1: AsyncIterable<T>, | |
v2: AsyncIterable<T2> | |
): AsyncIterableX<(T | T2)[]>; | |
export function combineLatestStatic<T, T2, T3>( | |
v1: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3> | |
): AsyncIterableX<(T | T2 | T3)[]>; | |
export function combineLatestStatic<T, T2, T3, T4>( | |
v1: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3>, | |
v4: AsyncIterable<T4> | |
): AsyncIterableX<(T | T2 | T3 | T4)[]>; | |
export function combineLatestStatic<T, T2, T3, T4, T5>( | |
v1: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3>, | |
v4: AsyncIterable<T4>, | |
v5: AsyncIterable<T5> | |
): AsyncIterable<(T | T2 | T3 | T4 | T5)[]>; | |
export function combineLatestStatic<T, T2, T3, T4, T5, T6>( | |
v1: AsyncIterable<T>, | |
v2: AsyncIterable<T2>, | |
v3: AsyncIterable<T3>, | |
v4: AsyncIterable<T4>, | |
v5: AsyncIterable<T5>, | |
v6: AsyncIterable<T6> | |
): AsyncIterable<(T | T2 | T3 | T4 | T5 | T6)[]>; | |
export function combineLatestStatic<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T[]> { | |
return new CombineLatestAsyncIterable<T>(args); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment