Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active November 1, 2017 02:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/678cac43245767e09e2ff976a245e745 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/678cac43245767e09e2ff976a245e745 to your computer and use it in GitHub Desktop.
import { AsyncIterableX } from './asynciterablex';
import { identity } from '../internal/identity';
// tslint:disable-next-line:no-empty
const NEVER_PROMISE = new Promise(() => {});
type MergeResult<T> = { value: T; index: number };
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
return promise.then(value => ({ value, index })) as Promise<MergeResult<T>>;
}
export class CombineLatestAsyncIterable<T> extends AsyncIterableX<T[]> {
private _source: AsyncIterable<T>[];
constructor(sources: AsyncIterable<T>[]) {
super();
this._source = sources;
}
async *[Symbol.asyncIterator]() {
const length = this._source.length;
const iterators = new Array<AsyncIterator<T>>(length);
const nexts = new Array<Promise<MergeResult<IteratorResult<T>>>>(length);
let hasValueAll = false;
const values = new Array<T>(length);
const hasValues = new Array<boolean>(length);
let active = length;
hasValues.fill(false);
for (let i = 0; i < length; i++) {
const iterator = this._source[i][Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}
while (active > 0) {
const next = Promise.race(nexts);
const { value: next$, index } = await next;
if (next$.done) {
nexts[index] = <Promise<MergeResult<IteratorResult<T>>>>NEVER_PROMISE;
active--;
} else {
values[index] = next$.value;
hasValues[index] = true;
const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
if (hasValueAll || (hasValueAll = hasValues.every(identity))) {
yield values;
}
}
}
}
}
export function combineLatest<T, T2>(
source: AsyncIterable<T>,
v2: AsyncIterable<T2>
): AsyncIterableX<(T | T2)[]>;
export function combineLatest<T, T2, T3>(
source: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>
): AsyncIterableX<(T | T2 | T3)[]>;
export function combineLatest<T, T2, T3, T4>(
source: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>,
v4: AsyncIterable<T4>
): AsyncIterableX<(T | T2 | T3 | T4)[]>;
export function combineLatest<T, T2, T3, T4, T5>(
source: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>,
v4: AsyncIterable<T4>,
v5: AsyncIterable<T5>
): AsyncIterable<(T | T2 | T3 | T4 | T5)[]>;
export function combineLatest<T, T2, T3, T4, T5, T6>(
source: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>,
v4: AsyncIterable<T4>,
v5: AsyncIterable<T5>,
v6: AsyncIterable<T6>
): AsyncIterable<(T | T2 | T3 | T4 | T5 | T6)[]>;
export function combineLatest<T>(
source: AsyncIterable<T>,
...args: AsyncIterable<T>[]): AsyncIterableX<T[]> {
return new CombineLatestAsyncIterable<T>([source, ...args]);
}
export function combineLatestStatic<T, T2>(
v1: AsyncIterable<T>,
v2: AsyncIterable<T2>
): AsyncIterableX<(T | T2)[]>;
export function combineLatestStatic<T, T2, T3>(
v1: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>
): AsyncIterableX<(T | T2 | T3)[]>;
export function combineLatestStatic<T, T2, T3, T4>(
v1: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>,
v4: AsyncIterable<T4>
): AsyncIterableX<(T | T2 | T3 | T4)[]>;
export function combineLatestStatic<T, T2, T3, T4, T5>(
v1: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>,
v4: AsyncIterable<T4>,
v5: AsyncIterable<T5>
): AsyncIterable<(T | T2 | T3 | T4 | T5)[]>;
export function combineLatestStatic<T, T2, T3, T4, T5, T6>(
v1: AsyncIterable<T>,
v2: AsyncIterable<T2>,
v3: AsyncIterable<T3>,
v4: AsyncIterable<T4>,
v5: AsyncIterable<T5>,
v6: AsyncIterable<T6>
): AsyncIterable<(T | T2 | T3 | T4 | T5 | T6)[]>;
export function combineLatestStatic<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T[]> {
return new CombineLatestAsyncIterable<T>(args);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment