Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Created March 24, 2020 02:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/77e665ea38d97d5db0dcca489c258da4 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/77e665ea38d97d5db0dcca489c258da4 to your computer and use it in GitHub Desktop.
import { ReduceOptions } from './reduceoptions';
import { wrapWithAbort } from './operators/withabort';
export async function reduce<T, R = T>(
source: AsyncIterable<T>,
options: ReduceOptions<T, R>
): Promise<R> {
const { seed, signal, callback } = options;
const hasSeed = options.hasOwnProperty('seed');
let i = 0;
let hasValue = false;
let acc = seed as T | R;
for await (const item of wrapWithAbort(source, signal)) {
if (hasValue || (hasValue = hasSeed)) {
acc = await callback(<R>acc, item, i++, signal);
} else {
acc = item;
hasValue = true;
i++;
}
}
if (!(hasSeed || hasValue)) {
throw new Error('Sequence contains no elements');
}
return acc as R;
}
import { AbortSignal } from '../abortsignal';
import { ScanOptions } from './operators/scanoptions';
export interface ReduceOptions<T, R> extends ScanOptions<T, R> {
signal?: AbortSignal;
}
import { AbortSignal } from '../../abortsignal';
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';
import { ScanOptions } from './scanoptions';
export class ScanAsyncIterable<T, R> extends AsyncIterableX<R> {
private _source: AsyncIterable<T>;
private _fn: (acc: R, x: T, index: number, signal?: AbortSignal) => R | Promise<R>;
private _seed?: T | R;
private _hasSeed: boolean;
constructor(
source: AsyncIterable<T>,
options: ScanOptions<T, R>
) {
super();
this._source = source;
this._fn = options.callback;
this._hasSeed = options.hasOwnProperty('seed');
this._seed = options.seed;
}
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let i = 0;
let hasValue = false;
let acc = this._seed;
for await (const item of wrapWithAbort(this._source, signal)) {
if (hasValue || (hasValue = this._hasSeed)) {
acc = await this._fn(<R>acc, item, i++, signal);
yield acc;
} else {
acc = item;
hasValue = true;
i++;
}
}
if (i === 1 && !this._hasSeed) {
yield acc as R;
}
}
}
export function scan<T, R = T>(
options: ScanOptions<T, R>
): OperatorAsyncFunction<T, R> {
return function scanOperatorFunction(source: AsyncIterable<T>): AsyncIterableX<R> {
return new ScanAsyncIterable(source, options);
};
}
import { AbortSignal } from '../../abortsignal';
export interface ScanOptions<T, R> {
seed?: R;
callback: (accumulator: R, current: T, index: number, signal?: AbortSignal) => R | Promise<R>;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment