Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active March 3, 2020 00:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/5dcdbcaee565b95968b69a97edfd20ea to your computer and use it in GitHub Desktop.
Save mattpodwysocki/5dcdbcaee565b95968b69a97edfd20ea to your computer and use it in GitHub Desktop.
export class AbortError extends Error {
constructor() {
super();
Object.setPrototypeOf(this, AbortError.prototype);
this.message = 'The operation has been aborted';
}
}
export function throwIfAborted(signal?: AbortSignal) {
if (signal?.aborted) {
throw new AbortError();
}
}
import { throwIfAborted } from '../aborterror';
import { wrapWithAbort } from './operators/withabort';
export async function count<T>(
source: AsyncIterable<T>,
fn: (value: T) => boolean | Promise<boolean> = async () => true,
signal?: AbortSignal
): Promise<number> {
throwIfAborted(signal);
let i = 0;
for await (const item of wrapWithAbort(source, signal)) {
if (await fn(item)) {
i++;
}
}
return i;
}
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';
export class MapAsyncIterable<TSource, TResult> extends AsyncIterableX<TResult> {
private _source: AsyncIterable<TSource>;
private _selector: (value: TSource, index: number, signal?: AbortSignal) => Promise<TResult> | TResult;
private _thisArg: any;
constructor(
source: AsyncIterable<TSource>,
selector: (value: TSource, index: number, signal?: AbortSignal) => Promise<TResult> | TResult,
thisArg?: any
) {
super();
this._source = source;
this._selector = selector;
this._thisArg = thisArg;
}
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let i = 0;
for await (const item of wrapWithAbort(this._source, signal)) {
const result = await this._selector.call(this._thisArg, item, i++, signal);
yield result;
}
}
}
export function map<TSource, TResult>(
selector: (value: TSource, index: number, signal?: AbortSignal) => Promise<TResult> | TResult,
thisArg?: any
): OperatorAsyncFunction<TSource, TResult> {
return function mapOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TResult> {
return new MapAsyncIterable<TSource, TResult>(source, selector, thisArg);
};
}
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _signal: AbortSignal;
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) {
super();
this._source = source;
this._signal = signal;
}
withAbort(signal: AbortSignal) {
return new WithAbortAsyncIterable<TSource>(this._source, signal);
}
[Symbol.asyncIterator](): AsyncIterator<TSource> {
// @ts-ignore
return this._source[Symbol.asyncIterator](this._signal);
}
}
export function withAbort<TSource>(
signal: AbortSignal,
): MonoTypeOperatorAsyncFunction<TSource> {
return function withAbortOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TSource> {
return new WithAbortAsyncIterable(source, signal);
};
}
export function wrapWithAbort<TSource>(
source: AsyncIterable<TSource>,
signal?: AbortSignal
) {
return signal ? new WithAbortAsyncIterable(source, signal) : source;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment