Last active
February 3, 2020 16:17
-
-
Save mattpodwysocki/1c8fa56eaa1cf253c404354b32c08601 to your computer and use it in GitHub Desktop.
Adding basic cancellation to IxJS AsyncIterable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { OperatorAsyncFunction } from '../../interfaces'; | |
import { wrapWithAbort } from './withabort'; | |
export class ConcatAllAsyncIterable<TSource> extends AsyncIterableX<TSource> { | |
private _source: AsyncIterable<AsyncIterable<TSource>>; | |
private _signal?: AbortSignal; | |
constructor(source: AsyncIterable<AsyncIterable<TSource>>, signal?: AbortSignal) { | |
super(); | |
this._source = source; | |
this._signal = signal; | |
} | |
async *[Symbol.asyncIterator]() { | |
for await (let outer of wrapWithAbort(this._source, this._signal)) { | |
for await (let item of wrapWithAbort(outer, this._signal)) { | |
yield item; | |
} | |
} | |
} | |
} | |
export function concatAll<T>(signal?: AbortSignal): OperatorAsyncFunction<AsyncIterable<T>, T> { | |
return function concatAllOperatorFunction( | |
source: AsyncIterable<AsyncIterable<T>> | |
): AsyncIterableX<T> { | |
return new ConcatAllAsyncIterable<T>(source, signal); | |
}; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AbortError } from "../util/aborterror"; | |
export function delay(action: () => void, dueTime: number, signal?: AbortSignal) { | |
return new Promise((resolve, reject) => { | |
if (signal?.aborted) { | |
throw new AbortError(); | |
} | |
const id = setTimeout(() => { | |
if (signal?.aborted) { | |
throw new AbortError(); | |
} | |
try { | |
action(); | |
resolve(); | |
} catch (e) { | |
reject(e); | |
} | |
}, dueTime); | |
if (signal) { | |
signal.onabort = () => { | |
clearTimeout(id); | |
reject(new AbortError()); | |
}; | |
} | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AbortError } from 'ix/util/aborterror'; | |
export function sleep(dueTime: number, signal?: AbortSignal) { | |
return new Promise<void>((resolve, reject) => { | |
if (signal?.aborted) { | |
reject(new AbortError()); | |
} | |
const id = setTimeout(() => { | |
if (signal?.aborted) { | |
reject(new AbortError()); | |
} | |
resolve(); | |
}, dueTime); | |
if (signal) { | |
signal.onabort = () => { | |
clearTimeout(id); | |
reject(new AbortError()); | |
}; | |
} | |
}); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { AsyncIterableX } from '../asynciterablex'; | |
import { MonoTypeOperatorAsyncFunction } from '../../interfaces'; | |
import { AbortError } from '../../util/aborterror'; | |
export class WithAbortAsyncIterable<TSource> extends AsyncIterableX<TSource> { | |
private _source: AsyncIterable<TSource>; | |
private _signal: AbortSignal; | |
constructor(source: AsyncIterable<TSource>, signal: AbortSignal) { | |
super(); | |
this._source = source; | |
this._signal = signal; | |
} | |
async *[Symbol.asyncIterator](): AsyncIterator<TSource, any, undefined> { | |
if (this._signal.aborted) { | |
throw new AbortError(); | |
} | |
for await (let item of this._source) { | |
if (this._signal.aborted) { | |
throw new AbortError(); | |
} | |
yield item; | |
} | |
} | |
} | |
export function wrapWithAbort<TSource>(source: AsyncIterable<TSource>, signal?: AbortSignal): AsyncIterable<TSource> { | |
return signal ? | |
new WithAbortAsyncIterable(source, signal) : | |
source; | |
} | |
export function withAbort<TSource>(signal: AbortSignal): MonoTypeOperatorAsyncFunction<TSource> { | |
return function withAbortOperatorFunction(source: AsyncIterable<TSource>): AsyncIterableX<TSource> { | |
return new WithAbortAsyncIterable<TSource>(source, signal); | |
}; | |
} |
@felixfbecker yep, I just realized that myself and just updated the gist
👍
I think it needs to use addEventListener()
with { once: true }
though so it doesn't override another handled and then be removed with removeEventListener()
. This is also why it sucks that AbortSignal
is just an EventTarget
- the listener does not even get cleaned up even if after the signal was cancelled and you need so much boilerplate for proper cleanup. Good example is here: https://github.com/sindresorhus/delay/blob/master/index.js
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The
sleep
function should also add an event listener to the abort signal that callsclearTimeout()
(but needs to make sure to also remove the listener if not needed anymore to avoid leaking memory)