Last active
March 3, 2020 00:58
-
-
Save mattpodwysocki/5dcdbcaee565b95968b69a97edfd20ea to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export class AbortError extends Error { | |
constructor() { | |
super(); | |
Object.setPrototypeOf(this, AbortError.prototype); | |
this.message = 'The operation has been aborted'; | |
} | |
} | |
export function throwIfAborted(signal?: AbortSignal) { | |
if (signal?.aborted) { | |
throw new AbortError(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { throwIfAborted } from '../aborterror'; | |
import { wrapWithAbort } from './operators/withabort'; | |
export async function count<T>( | |
source: AsyncIterable<T>, | |
fn: (value: T) => boolean | Promise<boolean> = async () => true, | |
signal?: AbortSignal | |
): Promise<number> { | |
throwIfAborted(signal); | |
let i = 0; | |
for await (const item of wrapWithAbort(source, signal)) { | |
if (await fn(item)) { | |
i++; | |
} | |
} | |
return i; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { OperatorAsyncFunction } from '../../interfaces'; | |
import { wrapWithAbort } from './withabort'; | |
export class MapAsyncIterable<TSource, TResult> extends AsyncIterableX<TResult> { | |
private _source: AsyncIterable<TSource>; | |
private _selector: (value: TSource, index: number, signal?: AbortSignal) => Promise<TResult> | TResult; | |
private _thisArg: any; | |
constructor( | |
source: AsyncIterable<TSource>, | |
selector: (value: TSource, index: number, signal?: AbortSignal) => Promise<TResult> | TResult, | |
thisArg?: any | |
) { | |
super(); | |
this._source = source; | |
this._selector = selector; | |
this._thisArg = thisArg; | |
} | |
async *[Symbol.asyncIterator](signal?: AbortSignal) { | |
let i = 0; | |
for await (const item of wrapWithAbort(this._source, signal)) { | |
const result = await this._selector.call(this._thisArg, item, i++, signal); | |
yield result; | |
} | |
} | |
} | |
export function map<TSource, TResult>( | |
selector: (value: TSource, index: number, signal?: AbortSignal) => Promise<TResult> | TResult, | |
thisArg?: any | |
): OperatorAsyncFunction<TSource, TResult> { | |
return function mapOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TResult> { | |
return new MapAsyncIterable<TSource, TResult>(source, selector, thisArg); | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { MonoTypeOperatorAsyncFunction } from '../../interfaces'; | |
export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> { | |
private _source: AsyncIterable<TSource>; | |
private _signal: AbortSignal; | |
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) { | |
super(); | |
this._source = source; | |
this._signal = signal; | |
} | |
withAbort(signal: AbortSignal) { | |
return new WithAbortAsyncIterable<TSource>(this._source, signal); | |
} | |
[Symbol.asyncIterator](): AsyncIterator<TSource> { | |
// @ts-ignore | |
return this._source[Symbol.asyncIterator](this._signal); | |
} | |
} | |
export function withAbort<TSource>( | |
signal: AbortSignal, | |
): MonoTypeOperatorAsyncFunction<TSource> { | |
return function withAbortOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TSource> { | |
return new WithAbortAsyncIterable(source, signal); | |
}; | |
} | |
export function wrapWithAbort<TSource>( | |
source: AsyncIterable<TSource>, | |
signal?: AbortSignal | |
) { | |
return signal ? new WithAbortAsyncIterable(source, signal) : source; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment