Skip to content

Instantly share code, notes, and snippets.

@mattpodwysocki
Last active February 3, 2020 16:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattpodwysocki/1c8fa56eaa1cf253c404354b32c08601 to your computer and use it in GitHub Desktop.
Save mattpodwysocki/1c8fa56eaa1cf253c404354b32c08601 to your computer and use it in GitHub Desktop.
Adding basic cancellation to IxJS AsyncIterable
import { AsyncIterableX } from '../asynciterablex';
import { OperatorAsyncFunction } from '../../interfaces';
import { wrapWithAbort } from './withabort';
export class ConcatAllAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<AsyncIterable<TSource>>;
private _signal?: AbortSignal;
constructor(source: AsyncIterable<AsyncIterable<TSource>>, signal?: AbortSignal) {
super();
this._source = source;
this._signal = signal;
}
async *[Symbol.asyncIterator]() {
for await (let outer of wrapWithAbort(this._source, this._signal)) {
for await (let item of wrapWithAbort(outer, this._signal)) {
yield item;
}
}
}
}
export function concatAll<T>(signal?: AbortSignal): OperatorAsyncFunction<AsyncIterable<T>, T> {
return function concatAllOperatorFunction(
source: AsyncIterable<AsyncIterable<T>>
): AsyncIterableX<T> {
return new ConcatAllAsyncIterable<T>(source, signal);
};
}
import { AbortError } from "../util/aborterror";
export function delay(action: () => void, dueTime: number, signal?: AbortSignal) {
return new Promise((resolve, reject) => {
if (signal?.aborted) {
throw new AbortError();
}
const id = setTimeout(() => {
if (signal?.aborted) {
throw new AbortError();
}
try {
action();
resolve();
} catch (e) {
reject(e);
}
}, dueTime);
if (signal) {
signal.onabort = () => {
clearTimeout(id);
reject(new AbortError());
};
}
});
}
import { AbortError } from 'ix/util/aborterror';
export function sleep(dueTime: number, signal?: AbortSignal) {
return new Promise<void>((resolve, reject) => {
if (signal?.aborted) {
reject(new AbortError());
}
const id = setTimeout(() => {
if (signal?.aborted) {
reject(new AbortError());
}
resolve();
}, dueTime);
if (signal) {
signal.onabort = () => {
clearTimeout(id);
reject(new AbortError());
};
}
});
}
import { AsyncIterableX } from '../asynciterablex';
import { MonoTypeOperatorAsyncFunction } from '../../interfaces';
import { AbortError } from '../../util/aborterror';
export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> {
private _source: AsyncIterable<TSource>;
private _signal: AbortSignal;
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) {
super();
this._source = source;
this._signal = signal;
}
async *[Symbol.asyncIterator](): AsyncIterator<TSource, any, undefined> {
if (this._signal.aborted) {
throw new AbortError();
}
for await (let item of this._source) {
if (this._signal.aborted) {
throw new AbortError();
}
yield item;
}
}
}
export function wrapWithAbort<TSource>(source: AsyncIterable<TSource>, signal?: AbortSignal): AsyncIterable<TSource> {
return signal ?
new WithAbortAsyncIterable(source, signal) :
source;
}
export function withAbort<TSource>(signal: AbortSignal): MonoTypeOperatorAsyncFunction<TSource> {
return function withAbortOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TSource> {
return new WithAbortAsyncIterable<TSource>(source, signal);
};
}
@felixfbecker
Copy link

The sleep function should also add an event listener to the abort signal that calls clearTimeout() (but needs to make sure to also remove the listener if not needed anymore to avoid leaking memory)

@mattpodwysocki
Copy link
Author

@felixfbecker yep, I just realized that myself and just updated the gist

@felixfbecker
Copy link

👍
I think it needs to use addEventListener() with { once: true } though so it doesn't override another handled and then be removed with removeEventListener(). This is also why it sucks that AbortSignal is just an EventTarget - the listener does not even get cleaned up even if after the signal was cancelled and you need so much boilerplate for proper cleanup. Good example is here: https://github.com/sindresorhus/delay/blob/master/index.js

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment