Last active
April 28, 2020 15:12
-
-
Save mattpodwysocki/844032c05b21fdba45e8771128e62331 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { OperatorAsyncFunction } from '../../interfaces'; | |
import { wrapWithAbort } from './withabort'; | |
import { throwIfAborted } from '../../aborterror'; | |
import { identity } from '../../util/identity'; | |
// eslint-disable-next-line @typescript-eslint/no-empty-function | |
const NEVER_PROMISE = new Promise(() => {}); | |
type MergeResult<T> = { value: T; index: number }; | |
function wrapPromiseWithIndex<T>(promise: Promise<T>, index: number) { | |
return promise.then((value) => ({ value, index })) as Promise<MergeResult<T>>; | |
} | |
export class WithLatestFromAsyncIterable<TSource> extends AsyncIterableX<TSource[]> { | |
private _source: AsyncIterable<TSource>; | |
private _others: AsyncIterable<TSource>[]; | |
constructor(source: AsyncIterable<TSource>, others: AsyncIterable<TSource>[]) { | |
super(); | |
this._source = source; | |
this._others = others; | |
} | |
async *[Symbol.asyncIterator](signal?: AbortSignal) { | |
throwIfAborted(signal); | |
const length = this._others.length; | |
const newLength = length + 1; | |
const iterators = new Array<AsyncIterator<TSource>>(newLength); | |
const nexts = new Array<Promise<MergeResult<IteratorResult<TSource>>>>(newLength); | |
let hasValueAll = false; | |
const hasValue = new Array(length); | |
const values = new Array(length); | |
hasValue.fill(false); | |
for (let i = 0; i < length; i++) { | |
const iterator = wrapWithAbort(this._others[i], signal)[Symbol.asyncIterator](); | |
iterators[i] = iterator; | |
nexts[i] = wrapPromiseWithIndex(iterator.next(), i); | |
} | |
const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); | |
nexts[length] = wrapPromiseWithIndex(it.next(), length); | |
for (;;) { | |
const next = Promise.race(nexts); | |
const { | |
value: { value: value$, done: done$ }, | |
index, | |
} = await next; | |
if (index === length) { | |
if (done$) { | |
break; | |
} | |
if (hasValueAll) { | |
yield [value$,...values]; | |
} | |
} else if (done$) { | |
nexts[index] = <Promise<MergeResult<IteratorResult<TSource>>>>NEVER_PROMISE; | |
} else { | |
values[index] = value$; | |
hasValue[index] = true; | |
hasValueAll = hasValue.every(identity); | |
const iterator$ = iterators[index]; | |
nexts[index] = wrapPromiseWithIndex(iterator$.next(), index); | |
} | |
} | |
} | |
} | |
export function withLatestFrom<T, T2>( | |
source2: AsyncIterable<T2> | |
): OperatorAsyncFunction<T, [T, T2]>; | |
export function withLatestFrom<T, T2, T3>( | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3> | |
): OperatorAsyncFunction<T, [T, T2, T3]>; | |
export function withLatestFrom<T, T2, T3, T4>( | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4> | |
): OperatorAsyncFunction<T, [T, T2, T3, T4]>; | |
export function withLatestFrom<T, T2, T3, T4, T5>( | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4>, | |
source5: AsyncIterable<T5> | |
): OperatorAsyncFunction<T, [T, T2, T3, T4, T5]>; | |
export function withLatestFrom<T, T2, T3, T4, T5, T6>( | |
source2: AsyncIterable<T2>, | |
source3: AsyncIterable<T3>, | |
source4: AsyncIterable<T4>, | |
source5: AsyncIterable<T5>, | |
source6: AsyncIterable<T6> | |
): OperatorAsyncFunction<T, [T, T2, T3, T4, T5, T6]>; | |
export function withLatestFrom<T>(...sources: AsyncIterable<T>[]): OperatorAsyncFunction<T, T[]> { | |
return function withLatestFromOperatorFunction(source: AsyncIterable<T>) { | |
return new WithLatestFromAsyncIterable<T>(source, sources); | |
}; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment