Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active October 5, 2017 05:39
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save mattpodwysocki/2b92c53c3bed2799c1470e60780c0395 to your computer and use it in GitHub Desktop.
import { AsyncIterableX } from '../asynciterable';
// tslint:disable-next-line:no-empty
const NEVER_PROMISE = new Promise(() => { });
class MergeAsyncIterable<T> extends AsyncIterableX<T> {
private _source: AsyncIterable<T>[];
constructor(source: AsyncIterable<T>[]) {
super();
this._source = source;
}
async *[Symbol.asyncIterator](): AsyncIterator<T> {
const length = this._source.length;
const iterators = new Array<AsyncIterator<T>>(length);
const nexts = new Array<Promise<{value: IteratorResult<T>, i: number}>>(length);
let active = length;
for (let i = 0; i < length; i++) {
const iterator = this._source[i][Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = iterator.next().then(value => { return { value, i }; });
}
while (active > 0) {
const { value, i } = await Promise.race(nexts);
if (value.done) {
nexts[i] = <Promise<{value: IteratorResult<T>, i: number}>>(NEVER_PROMISE);
active--;
} else {
const iterator$ = iterators[i];
nexts[i] = iterator$.next().then(value => { return { value, i }; });
yield value.value;
}
}
}
}
export function _mergeAll<TSource>(source: AsyncIterable<TSource>[]): AsyncIterableX<TSource> {
return new MergeAsyncIterable<TSource>(source);
}
export function merge<T>(source: AsyncIterable<T>, ...args: AsyncIterable<T>[]): AsyncIterableX<T> {
return new MergeAsyncIterable<T>([source, ...args]);
}
export function mergeStatic<T>(...args: AsyncIterable<T>[]): AsyncIterableX<T> {
return new MergeAsyncIterable<T>(args);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment