Skip to content

Instantly share code, notes, and snippets.

@rixtox
Last active June 12, 2023 09:46
Show Gist options
  • Save rixtox/907ec9026733d26683973cba48bf5e75 to your computer and use it in GitHub Desktop.
Save rixtox/907ec9026733d26683973cba48bf5e75 to your computer and use it in GitHub Desktop.
namespace type {
export interface Lifecycle extends LifeMaker {
/**
* Run a task while extending the lifetime of the Lifecycle to cover the
* duration of the task. Teardown functions added to this Lifecycle from
* the task are deferred to the Dying stage of the Lifecycle, joining
* with teardowns registered from all the tasks.
*
* It returns a rejected Promise immediately without executing the sub
* task if the Lifecycle has already entered Dying stage.
*
* @param task is a task to be executed on the Lifecycle.
* @param args are arguments to the task, omitting the first parameter
* which will be filled-in with the current Lifecycle object.
*
* @returns a Promise that fulfills immediately after the task fulfills.
* If an error was thrown from the task, the Promise would reject with
* that error immediately. Note that this Promise doesn't wait for any
* teardown tasks registered with the {@link Lifecycle.defer} function
* to fulfill.
*/
extend<Return>(task: PromiseLike<Return>): Promise<Return>;
extend<Return, Args extends any[]>(
task: (...args: Args) => Promise<Return>,
...args: Args
): Promise<Return>;
/**
* Register a teardown callback to be called when the Lifecycle has
* entered the Dying stage. That is, when all tasks have been fulfilled.
* The scheduling order of the teardown callbacks is Last-In-First-Out.
*
* @remarks
*
* Since teardown callbacks are scheduled in the Dying stage of the
* Lifecycle, if you want to call another LifeFunction inside the
* teardown function, you cannot use the dying Lifecycle anymore.
* Therefore, the first argument passed to the teardown callback is the
* parent Lifecycle of the dying Lifecycle, which is guaranteed to be
* alive. So you should write something like:
*
* life.defer((life, err) => reportTeardown(life, err));
*
* @param callback The deferred callback to be called at Lifecycle's
* Dying stage. The first parameter passed to the callback is the parent
* Lifecycle of the dying Lifecycle. The second parameter is a
* {@link LifecycleDyingError} if the first task or any teardown
* callbacks have thrown an error, with the
* {@link LifecycleDyingError.error} field being the thrown error value.
* Otherwise, the second parameter is `undefined`.
*/
defer(callback: LifeTeardown): void;
/**
* Register a callback function to be called immediately when the
* Lifecycle is aborted. It ensures proper register and deregister of
* the callback, and extend the lifetime of the Lifecycle accordingly.
*
* @remarks
*
* Note that if an abort signal came in after the Lifecycle entered
* Dying stage, the callback will not be called.
*
* @param callback Function to be called when the Lifecycle is aborted
*/
onAbort(callback: LifeAbortCallback): void;
/**
* Bind a {@link Disposable} or {@link AsyncDisposable} resource to the
* Lifecycle. The Lifecycle will call the resource's dispose method in
* the Dying stage. Check
* {@link https://github.com/tc39/proposal-explicit-resource-management | the Disposable contract}
* for more information.
*
* @param disposable a {@link Disposable} or {@link AsyncDisposable}
* resource
*/
use(disposable: Disposable | AsyncDisposable): void;
/**
* The AbortSignal set on the Lifecycle. {@link Lifecycle.onAbort}
* should be preferred for adding callbacks reacting to the AbortSignal.
* This property is convenient for passing to supported methods like
* {@link https://developer.mozilla.org/en-US/docs/Web/API/Fetch_API/Using_Fetch#aborting_a_fetch | fetch}.
*/
signal: AbortSignal;
}
export interface LifeMaker {
/**
* Start a new Lifecycle and extend the lifetime of the parent Lifecycle
* to the lifetime of the child Lifecycle. The abort signal from the
* parent will propagate and cancel the child Lifecycle. External abort
* signal can be mixed-in with {@link Lifecycle.withAbort}.
*
* @remarks
*
* The parent Lifecycle will hold onto its resources until the child
* Lifecycle finishes. It provides a safer default behavior when using
* the Lifecycle contract.
*
* If you want to opt out from this behavior, you have to first make
* sure your task doesn't use resources bound to the parent Lifecycle
* after the parent Lifecycle dies. Then you can spawn from a higher
* level Lifecycle, for example, from the root Lifecycle:
*
* Lyfe.withSignal(parent.signal).spawn(task, ...args)
*
* It starts a new Lifecycle without extending the parent Lifecycle, but
* get killed when the parent Lifecycle get killed. Or you can write
*
* Lyfe.spawn(task, ...args)
*
* to start a completely detached Lifecycle. These use cases are unsafe
* and possible to make use-after-free mistakes. Please spawn from an
* enclosing Lifecycle whenever possible.
*
* @param task is the initial task to be executed on the new Lifecycle
* @param args are arguments to the task, omitting the first parameter
* which will be filled-in with the current Lifecycle object.
*
* @returns a Promise that fulfills to the task's result after the new
* Lifecycle finishes.
*/
spawn<Return, Args extends any[]>(
task: LifeFunction<Return, Args>,
...args: Args
): Promise<Return>;
/**
* @experimental
* THIS IS AN EXPERIMENTAL AND UNSAFE FUNCTION! AVOID IF POSSIBLE!
*
* Spawn a child Lifecycle while transforming the Lifecycle contract
* from declarative stack passing style into imperative manual release
* style using
* {@link https://github.com/tc39/proposal-async-explicit-resource-management | the AsyncDisposable contract}.
*
* {@link Disposable} objects are leaky by default. You have to bind it
* to a {@link DisposableStack} or {@link AsyncDisposableStack} or to a
* block scope with the `using` syntax. Failing to do so would result in
* resource not being able to ever clean up and leak.
*
* @param task is the initial task to be executed on the new Lifecycle
* @param args are arguments to the task, omitting the first parameter
* which will be filled-in with the new Lifecycle object.
*
* @returns an {@link AsyncDisposable} to retain the resources of the
* child Lifecycle indefinitely until the dispose method was called. IT
* WILL LEAK IF YOU DON'T CALL THE RELEASE HANDLE!
*/
unsafeSpawnAsyncDisposable<Return extends object, Args extends any[]>(
task: LifeFunctionSync<Return, Args>,
...args: Args
): AsyncDisposableEnabled<Return>;
/**
* Use an external {@link AbortController} or {@link AbortSignal} to
* cancel the new Lifecycle and all descendant Lifecycles spawned from
* it. Aborting a Lifecycle is optimistic. The exact behavior depends on
* the implementation of the tasks running on that Lifecycle. Reacting
* to the abort signal is an opt-in option as a task author. If a task
* doesn't react to abort signal, it will continue execution normally.
*
* @param abort an external {@link AbortController} or
* {@link AbortSignal} to cancel the new Lifecycle
*
* @returns a new {@link LifeMaker} factory object
*/
withAbort(abort?: AbortController | AbortSignal): LifeMaker;
}
}
namespace lyfe {
class LifeMaker implements type.LifeMaker {
constructor(
protected _self?: Lifecycle,
protected abort?: AbortController | AbortSignal
) {}
public spawn<Return, Args extends any[]>(
task: LifeFunction<Return, Args>,
...args: Args
): Promise<Return> {
const child = new Lifecycle(this.self, this.abort);
const promise = this.start(child, task, ...args);
if (!this.self) return promise;
return this.self.extend(promise);
}
protected async start<Return, Args extends any[]>(
life: Lifecycle,
task: LifeFunction<Return, Args>,
...args: Args
): Promise<Return> {
try {
life._incrementTask();
return await task(life, ...args);
} catch (error) {
life._setError(error);
throw error;
} finally {
life._decrementTask();
await life._dying;
await life._finalize();
}
}
public unsafeSpawnAsyncDisposable<
Return extends object,
Args extends any[]
>(
task: LifeFunctionSync<Return, Args>,
...args: Args
): AsyncDisposableEnabled<Return> {
let life!: type.Lifecycle;
let done!: () => void;
let die!: (reason: any) => void;
const alive = new Promise<void>((r, j) => ((done = r), (die = j)));
const death = this.spawn((l) => ((life = l), alive));
life.onAbort((_, reason) => die(reason));
try {
const result = task(life, ...args);
return Object.assign(result, {
[asyncDisposeSymbol()]() {
done();
return death;
},
});
} catch (e) {
die(e);
throw e;
}
}
public withAbort(abort?: AbortController | AbortSignal): LifeMaker {
return new LifeMaker(this.self, abort);
}
protected get self(): Lifecycle | undefined {
return this._self;
}
}
export class Lifecycle extends LifeMaker implements type.Lifecycle {
/**
* Create the root Lifecycle that never dies.
*
* @returns the root Lifecycle
*/
public static root(): type.Lifecycle {
const life = new Lifecycle();
// extend its lifetime indefinitely
life.extend(() => new Promise<void>(() => {}));
return life;
}
public extend<Return>(task: PromiseLike<Return>): Promise<Return>;
public extend<Return, Args extends any[]>(
task: (...args: Args) => Promise<Return>,
...args: Args
): Promise<Return>;
public async extend<Return, Args extends any[]>(
task: PromiseLike<Return> | ((...args: Args) => Promise<Return>),
...args: Args
): Promise<Return> {
if (this._stage >= Stage.Dying) {
throw new LifecycleDyingError(this._error);
}
try {
this._incrementTask();
if (typeof task === "function") {
// task is using the LifeFunction contract, so the task
// function takes the Lifecycle as its first argument.
// Passing Lifecycle this way enables the full potential of
// using the Lifecycle contract, such as registering
// teardown tasks with defer().
return await task(...args);
} else {
// If task is a plain Promise object not implementing the
// Lifecycle contract, then there's no way to propagate the
// kill signal to the task. We can only extend the current
// Lifecycle to the end of this task Promise. This is the
// maximum interoperability we can have with plain Promise
// contract.
return await task;
}
} finally {
this._decrementTask();
if (this._hasError) {
throw this._error;
}
}
}
public defer(callback?: LifeTeardown): void {
if (!callback) return;
if (this._stage > Stage.Dying) {
throw new LifecycleDyingError(this._error);
}
this._teardowns.push(callback);
}
public onAbort(callback: LifeAbortCallback): void {
const abort = async () => {
try {
await this.extend(
callback(this.parent!, this.signal.reason)
);
} catch (err) {
this._setError(err);
}
};
if (this.signal.aborted) {
return void abort();
}
this.signal.addEventListener("abort", abort);
this.defer(() => this.signal.removeEventListener("abort", abort));
}
public use(disposable?: Disposable | AsyncDisposable): void {
if (!disposable) return;
if ((disposable as AsyncDisposable)[asyncDisposeSymbol()]) {
this.defer(() =>
(disposable as AsyncDisposable)[asyncDisposeSymbol()]()
);
} else if ((disposable as Disposable)[disposeSymbol()]) {
this.defer(() => (disposable as Disposable)[disposeSymbol()]());
}
}
/**
* Internal constructor for making an empty Lifecycle.
*/
constructor(
private parent?: Lifecycle,
abort?: AbortController | AbortSignal
) {
super();
this._dying = new Promise((r) => (this._startDying = r));
const { signal, teardown } = mixAbort(parent?.signal, abort);
this.signal = signal;
this.defer(teardown);
}
public signal: AbortSignal;
protected get self(): Lifecycle {
return this;
}
/**
* Transition the Lifecycle into Dying stage, and perform teardown tasks.
* Errors thrown from a teardown task will be set as the pending error for
* the Lifecycle Promise, if no other errors have been set.
*/
public async _finalize(): Promise<void> {
this._stage = Stage.Dying;
while (this._teardowns.length) {
const task = this._teardowns.pop()!;
try {
const error = this._hasError
? new LifecycleDyingError(this._error)
: undefined;
await task(this.parent!, error);
} catch (error) {
this._setError(error);
}
}
if (this._hasError) {
throw this._error;
}
}
/**
* Set an error on the Lifecycle and transition to Dying stage.
*
* @param error the error to be set
*/
public _setError(error: any): void {
if (!this._hasError) {
this._error = error;
this._hasError = true;
this._stage = Stage.Dying;
}
}
public _incrementTask(): void {
++this._tasks;
}
public _decrementTask(): void {
if (--this._tasks === 0) {
this._startDying();
}
}
/**
* The number of running tasks.
*/
private _tasks = 0;
/**
* Promise that resolves when all running tasks are finished.
*/
public _dying!: Promise<void>;
/**
* Resolve callback to be called when all running tasks are finished.
*/
private _startDying!: () => void;
/**
* The error value. Either thrown from the first task or any teardown
* callbacks. Whichever happened first will be set on this `_error`
* value and subsequent attempt to set it will be ignored.
*/
private _error: any = undefined;
/**
* True if _error has been set.
*/
private _hasError = false;
/**
* The Lifecycle stage.
*/
private _stage = Stage.Alive;
/**
* Array of teardown callbacks.
*/
private _teardowns: Array<LifeTeardown> = [];
}
const enum Stage {
Alive,
Dying,
}
}
/**
* The Lifecycle interface.
*/
export type Lifecycle = type.Lifecycle;
/**
* The root Lifecycle that never dies.
*/
export const Lyfe: RootLifecycle = lyfe.Lifecycle.root();
export type RootLifecycle = type.LifeMaker;
/**
* A LifeFunction define the contract that when the function is executed on a
* Lifecycle, the Lifecycle is passed as the first argument to this function.
*/
export type LifeFunction<Return = any, Args extends any[] = any[]> = (
life: Lifecycle,
...rest: Args
) => Promise<Return> | Return;
export type LifeFunctionSync<Return = any, Args extends any[] = any[]> = (
life: Lifecycle,
...rest: Args
) => Return;
/**
* The teardown function is to be invoked in the Dying stage of the Lifecycle,
* after all tasks have finished. If all tasks completed without error, the
* teardown function will be invoked with no arguments. If any of the previous
* tasks or teardown functions thrown an error, the next teardown function will
* be invoked with a LifecycleDyingError with the actual error wrapped inside.
*/
export type LifeTeardown = (
parent: Lifecycle,
error?: LifecycleDyingError
) => Promise<any> | any;
/**
* A callback function to be invoked when the Lifecycle is aborted.
*/
export type LifeAbortCallback = (
parent: Lifecycle,
reason: any
) => Promise<any> | any;
/**
* An Error class to indicate that an error was generated or forwarded from a
* Lifecycle after the Dying stage. Inspect the
* {@link LifecycleDyingError.error} field for the original error.
*/
export class LifecycleDyingError extends Error {
constructor(public error?: any) {
super("Lifecycle already dying");
}
}
/**
* Returns an AbortSignal that never aborts.
*/
export const NeverAbort = (() => {
let signal!: AbortSignal;
return (): AbortSignal => {
return signal || (signal = new AbortController().signal);
};
})();
export interface MixedAbortSignal {
signal: AbortSignal;
teardown?: () => void;
}
export function mixAbort(
parent?: AbortSignal,
abort?: AbortSignal | AbortController
): MixedAbortSignal {
if (!abort) {
if (parent) return { signal: parent };
else return { signal: NeverAbort() };
}
if (parent === abort) return { signal: parent };
const control = isAbortController(abort) ? abort : new AbortController();
const { signal } = control;
const derived: MixedAbortSignal = { signal };
if (abort instanceof AbortSignal) {
const onAbort = () => control.abort(abort.reason);
derived.teardown = () =>
abort.removeEventListener("abort", onAbort);
abort.addEventListener("abort", onAbort);
}
if (!parent) return derived;
const innerTeardown = derived.teardown;
const onAbort = () => control.abort(parent.reason);
derived.teardown = () => {
parent.removeEventListener("abort", onAbort);
innerTeardown && innerTeardown();
};
parent.addEventListener("abort", onAbort);
return derived;
}
function isAbortController(abort: unknown): abort is AbortController {
return !!(
abort &&
typeof (abort as AbortController).abort === "function" &&
(abort as AbortController).signal instanceof AbortSignal
);
}
export const disposeSymbol = (): SymbolConstructor["dispose"] => {
return (
Symbol.dispose ||
((Symbol as any).dispose = Symbol("Symbol.dispose"))
);
};
export const asyncDisposeSymbol = (): SymbolConstructor["asyncDispose"] => {
return (
Symbol.asyncDispose ||
((Symbol as any).asyncDispose = Symbol("Symbol.asyncDispose"))
);
};
export type AsyncDisposableEnabled<T> = T & AsyncDisposable;
declare global {
export interface SymbolConstructor {
readonly dispose: unique symbol;
readonly asyncDispose: unique symbol;
}
export interface Disposable {
[Symbol.dispose](): any;
}
export interface AsyncDisposable {
[Symbol.asyncDispose](): Promise<any>;
}
}
import { describe, expect, test, jest, afterEach } from "@jest/globals";
import { Lyfe, Lifecycle, NeverAbort } from ".";
function sleep(delayMs: number, signal?: AbortSignal): Promise<void> {
return Lyfe.withAbort(signal).spawn(
(life) =>
new Promise((resolve, reject) => {
const timer = setTimeout(resolve, delayMs);
life.onAbort((_, err) => {
clearTimeout(timer);
reject(err);
});
})
);
}
async function delayThenCall<Return, Args extends any[]>(
delayMs: number,
signal = NeverAbort(),
callback: (...args: Args) => Return,
...args: Args
): Promise<Return> {
await sleep(delayMs, signal);
return callback(...args);
}
function Allocate(life: Lifecycle, length: number): Array<number> {
const arr = new Array(length).fill(length);
life.defer(() => console.log("defer end"));
life.defer(() => arr.splice(0, length));
life.defer(() => sleep(200));
life.defer(() => console.log("defer start"));
return arr;
}
describe("Lyfe", () => {
afterEach(() => {
jest.resetAllMocks();
});
test("extend() should extend Lifecycle to the async task's completion", async () => {
jest.useFakeTimers({ advanceTimers: true });
const delay1 = 20;
const delay2 = 40;
const delay3 = 60;
let tCalled1!: number;
let tCalled2!: number;
let tCalled3!: number;
let tResolve1!: number;
let tResolve2!: number;
let tResolve3!: number;
async function inner1(x: number, signal: AbortSignal): Promise<number> {
try {
tCalled1 = performance.now();
return await delayThenCall(delay2, signal, inner2, x);
} finally {
tResolve2 = performance.now();
}
}
async function inner2(x: number): Promise<number> {
tCalled2 = performance.now();
return x * 2;
}
function inner3(): void {
tCalled3 = performance.now();
}
function doSomething(life: Lifecycle): void {
// Add more async tasks that extends the passed in Lifecycle. Caller
// is unaware of these extended sub tasks but the returned Lifecycle
// Promise will not be fulfilled until all the added sub tasks are
// fulfilled.
life.extend(delayThenCall, delay3, life.signal, inner3).then(() => {
tResolve3 = performance.now();
});
}
const result = await Lyfe.spawn((life) => {
// doSomething() may extend the Lifecycle with more async sub tasks.
// Here the caller doesn't need to know the details of that. The
// function signature doesn't indicate it being an async function
// either. There's no returned Promise to await for.
//
// However, by following the Lifecycle passing contract, we can be
// sure the main task, which was started by the `new Lyfe()` call,
// would not be fulfilled until all the extended sub tasks
// originated from doSomething() are fulfilled.
doSomething(life);
// In this example inner1 resolves after delay1+delay2=500ms, and
// doSomething() scheduled sub task that would take 1000ms to
// resolve. Even though we return the result immediately after
// inner1 resolves, the wrapped Lifecycle Promise will not be
// resolved until the sub task registered inside doSomething() has
// also been resolved. This makes sure we don't "leak" async tasks
// originated from the main task.
return delayThenCall(
delay1,
life.signal,
inner1,
3,
life.signal
).then((result) => {
tResolve1 = performance.now();
return result;
});
});
const tDead = performance.now();
expect(result).toBe(6);
expect(tCalled1).toBe(delay1);
expect(tCalled2).toBe(delay1 + delay2);
expect(tCalled3).toBe(delay3);
expect(tResolve1).toBe(delay1 + delay2);
expect(tResolve2).toBe(delay1 + delay2);
expect(tResolve3).toBe(delay3);
expect(tDead).toBe(delay3);
});
test("Async resource management", async () => {
const spy = jest.spyOn(console, "log");
spy.mockImplementation(() => {});
let arr!: Array<number>;
await Lyfe.spawn(async (life) => {
arr = Allocate(life, 5);
console.log(`start [${arr}]`);
life.extend(
Promise.resolve().then(() => {
console.log(`async [${arr}]`);
})
);
console.log(`end [${arr}]`);
});
console.log(`done [${arr}]`);
expect(spy.mock.calls).toStrictEqual([
["start [5,5,5,5,5]"],
["end [5,5,5,5,5]"],
["async [5,5,5,5,5]"],
["defer start"],
["defer end"],
["done []"],
]);
});
test("Unsafe explicit resource management", async () => {
jest.useFakeTimers({ advanceTimers: true });
class Resource {
constructor(life: Lifecycle) {
this._life = life;
}
private _life!: Lifecycle;
run() {
this._life.defer(() => sleep(20));
this._life.extend(async () => {
await sleep(40);
});
}
}
const unsafeResource = Lyfe.unsafeSpawnAsyncDisposable(
(life) => new Resource(life)
);
unsafeResource.run();
await sleep(60);
await expect(
unsafeResource[Symbol.asyncDispose]()
).resolves.not.toThrow();
expect(performance.now()).toBe(80);
});
test("Cancellation", async () => {
jest.useFakeTimers({ advanceTimers: true });
const abort = new AbortController();
setTimeout(() => abort.abort(new Error("successfully cancelled")), 20);
await expect(
Lyfe.withAbort(abort).spawn(async (life) => {
const c = new AbortController();
await life.withAbort(c).spawn(async () => {
await sleep(40, life.signal);
});
})
).rejects.toThrow("successfully cancelled");
expect(performance.now()).toBe(20);
});
test("Error stack trace", async () => {
const error = await Lyfe.spawn(async function foo(life) {
return await life.spawn(async function bar(life) {
return await life.spawn(async function baz(life) {
return new Error("something wrong");
});
});
});
expect(error.message).toBe("something wrong");
const stack = error.stack!.split("\n").map((l) => l.split(/\s+/));
expect(stack[1][2]).toBe("baz");
expect(stack[2][2]).toBe("Lifecycle.start");
expect(stack[3][2]).toBe("Lifecycle.spawn");
expect(stack[4][2]).toBe("bar");
expect(stack[5][2]).toBe("Lifecycle.start");
expect(stack[6][2]).toBe("Lifecycle.spawn");
expect(stack[7][2]).toBe("foo");
expect(stack[8][2]).toBe("Lifecycle.start");
expect(stack[9][2]).toBe("Lifecycle.spawn");
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment