Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active April 28, 2020 15:12
Show Gist options
  • Save mattpodwysocki/844032c05b21fdba45e8771128e62331 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/844032c05b21fdba45e8771128e62331 to your computer and use it in GitHub Desktop.
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';
import { throwIfAborted } from '../../aborterror';
import { identity } from '../../util/identity';
// eslint-disable-next-line @typescript-eslint/no-empty-function
const NEVER_PROMISE = new Promise(() => {});
type MergeResult<T> = { value: T; index: number };
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) {
return promise.then((value) => ({ value, index })) as Promise<MergeResult<T>>;
}
export class WithLatestFromAsyncIterable<TSource> extends AsyncIterableX<TSource[]> {
private _source: AsyncIterable<TSource>;
private _others: AsyncIterable<TSource>[];
constructor(source: AsyncIterable<TSource>, others: AsyncIterable<TSource>[]) {
super();
this._source = source;
this._others = others;
}
async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);
const length = this._others.length;
const newLength = length + 1;
const iterators = new Array<AsyncIterator<TSource>>(newLength);
const nexts = new Array<Promise<MergeResult<IteratorResult<TSource>>>>(newLength);
let hasValueAll = false;
const hasValue = new Array(length);
const values = new Array(length);
hasValue.fill(false);
for (let i = 0; i < length; i++) {
const iterator = wrapWithAbort(this._others[i], signal)[Symbol.asyncIterator]();
iterators[i] = iterator;
nexts[i] = wrapPromiseWithIndex(iterator.next(), i);
}
const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator]();
nexts[length] = wrapPromiseWithIndex(it.next(), length);
for (;;) {
const next = Promise.race(nexts);
const {
value: { value: value$, done: done$ },
index,
} = await next;
if (index === length) {
if (done$) {
break;
}
if (hasValueAll) {
yield [value$,...values];
}
} else if (done$) {
nexts[index] = <Promise<MergeResult<IteratorResult<TSource>>>>NEVER_PROMISE;
} else {
values[index] = value$;
hasValue[index] = true;
hasValueAll = hasValue.every(identity);
const iterator$ = iterators[index];
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index);
}
}
}
}
export function withLatestFrom<T, T2>(
source2: AsyncIterable<T2>
): OperatorAsyncFunction<T, [T, T2]>;
export function withLatestFrom<T, T2, T3>(
source2: AsyncIterable<T2>,
source3: AsyncIterable<T3>
): OperatorAsyncFunction<T, [T, T2, T3]>;
export function withLatestFrom<T, T2, T3, T4>(
source2: AsyncIterable<T2>,
source3: AsyncIterable<T3>,
source4: AsyncIterable<T4>
): OperatorAsyncFunction<T, [T, T2, T3, T4]>;
export function withLatestFrom<T, T2, T3, T4, T5>(
source2: AsyncIterable<T2>,
source3: AsyncIterable<T3>,
source4: AsyncIterable<T4>,
source5: AsyncIterable<T5>
): OperatorAsyncFunction<T, [T, T2, T3, T4, T5]>;
export function withLatestFrom<T, T2, T3, T4, T5, T6>(
source2: AsyncIterable<T2>,
source3: AsyncIterable<T3>,
source4: AsyncIterable<T4>,
source5: AsyncIterable<T5>,
source6: AsyncIterable<T6>
): OperatorAsyncFunction<T, [T, T2, T3, T4, T5, T6]>;
export function withLatestFrom<T>(...sources: AsyncIterable<T>[]): OperatorAsyncFunction<T, T[]> {
return function withLatestFromOperatorFunction(source: AsyncIterable<T>) {
return new WithLatestFromAsyncIterable<T>(source, sources);
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment