|
// deno-lint-ignore-file require-await |
|
import { AsyncResource } from "node:async_hooks"; |
|
import { Queue } from "./queue.ts"; |
|
import { bind } from "./helpers.ts"; |
|
|
|
export class PLimit |
|
implements |
|
Iterable<void | Promise<void>>, |
|
AsyncIterable<void | Promise<void>>, |
|
Disposable, |
|
AsyncDisposable { |
|
constructor(concurrency?: number) { |
|
if (concurrency) this.concurrency = concurrency; |
|
} |
|
|
|
// #region Public |
|
/** |
|
* The maximum number of promises that can be pending at the same time. |
|
* Must be a non-zero positive integer >= 1. Defaults to the number of |
|
* logical CPUs available on the current system. |
|
* |
|
* @default {navigator.hardwareConcurrency} |
|
*/ |
|
public get concurrency(): number { |
|
return this.#concurrency; |
|
} |
|
|
|
public set concurrency(value: number) { |
|
if ( |
|
typeof value !== "number" || isNaN(value) || |
|
(value < 1) || !Number.isSafeInteger(value) |
|
) { |
|
throw new TypeError( |
|
`Expected 'value' to be a positive non-zero finite integer. Received: '${value}' (${typeof value})`, |
|
); |
|
} |
|
this.#concurrency = value; |
|
} |
|
|
|
/** The number of promises currently active (executing). */ |
|
public get activeCount(): number { |
|
return this.#activeCount; |
|
} |
|
|
|
/** The number of promises currently pending (waiting to execute). */ |
|
public get pendingCount(): number { |
|
return this.#queue.size; |
|
} |
|
|
|
/** Clears the queue and resets the {@link activeCount}. */ |
|
public clear(): void { |
|
this.#activeCount = 0; |
|
this.#queue.clear(); |
|
} |
|
|
|
/** |
|
* Adds a promise-returning or async function to the queue. Returns a promise |
|
* that resolves when the function is finished executing, with respect to the |
|
* concurrency limit. The promise rejects if the function throws an error or |
|
* if the queue is ended before the function has a chance to execute. |
|
* |
|
* @param fn promise-returning or async function |
|
* @param args arguments to pass to the function |
|
* @returns a promise that resolves when the function is finished executing |
|
* |
|
* @example |
|
* ```ts |
|
* const list = new PLimit(1); |
|
* const items = []; |
|
* const runner = async (i: number) => { |
|
* await new Promise((resolve) => setTimeout(resolve, 1000)); |
|
* items.push( |
|
* `| #${(i+"").padEnd(3)} | Active: ${ |
|
* (list.activeCount + "").padEnd(3) |
|
* } | Pending: ${(list.pendingCount+"").padEnd(3)} |`, |
|
* ); |
|
* }; |
|
* |
|
* await Promise.all([ |
|
* list.add(runner, 1), |
|
* list.add(runner, 2), |
|
* list.add(runner, 3), |
|
* list.add(() => Promise.resolve("all done!")), |
|
* ]); |
|
* |
|
* console.log(items.join("\n")); |
|
* ``` |
|
*/ |
|
public add< |
|
const A extends readonly unknown[], |
|
const T = unknown, |
|
>( |
|
fn: PromiseLike<T> | ((...args: A) => T | PromiseLike<T>), |
|
...args: A |
|
): Promise<T> { |
|
const { promise, resolve } = Promise.withResolvers<T>(); |
|
fn = typeof fn === "function" ? fn : () => fn as T | PromiseLike<T>; |
|
this.#enqueue(fn, resolve, args); |
|
return promise; |
|
} |
|
|
|
/** |
|
* This method is an alias for {@link PLimit.add}. |
|
* |
|
* @param fn promise-returning or async function |
|
* @param args arguments to pass to the function |
|
* @returns a promise that resolves when the function is finished executing |
|
*/ |
|
public limit< |
|
const A extends readonly unknown[], |
|
const T = unknown, |
|
>( |
|
fn: PromiseLike<T> | ((...args: A) => T | PromiseLike<T>), |
|
...args: A |
|
): Promise<T> { |
|
return this.add(fn, ...args); |
|
} |
|
|
|
/** |
|
* Drains the queue and returns a promise that resolves when all promises |
|
* have finished executing. The promise rejects immediately if any of the |
|
* promises reject. The queue is cleared after this method is called. The |
|
* results are accumulated into an array and returned once the queue is |
|
* drained. The returned array's elements are guaranteed to be in the same |
|
* order as the functions that were added to the queue. |
|
* |
|
* @returns a promise that resolves to an aggregate array of the results that |
|
* were yielded by each of the functions in the queue. |
|
*/ |
|
public async drain<T = unknown>(): Promise<readonly T[]> { |
|
const results: unknown[] = []; |
|
try { |
|
for await (const fn of this.#queue) { |
|
const result = await fn(); |
|
results.push(result); |
|
} |
|
} finally { |
|
this.clear(); |
|
} |
|
return results as readonly T[]; |
|
} |
|
|
|
/** |
|
* Synchronously iterates over the queue, invoking and yielding the results |
|
* of each function in the queue. The queue is cleared once the iteration is |
|
* complete, regardless of whether the iteration is interrupted by an error. |
|
* |
|
* Unlike its asynchronous counterpart, this method does not accumulate the |
|
* yielded results into an aggregate array. It simply yields the results as |
|
* they are produced. |
|
* |
|
* This method is used internally by the semantics of the `for ... of` loop, |
|
* and is not intended to be called directly. |
|
* |
|
* @category Iteration |
|
*/ |
|
public *[Symbol.iterator](): IterableIterator<Promise<void>> { |
|
try { |
|
for (const fn of this.#queue) yield fn(); |
|
} finally { |
|
this.clear(); |
|
} |
|
} |
|
|
|
/** |
|
* Asynchronously iterates over the queue, invoking and yielding the results |
|
* of each function in the queue. The queue is cleared once the iteration is |
|
* complete, regardless of whether the iteration is interrupted by an error. |
|
* Results are pushed to an array as they are yielded, and once the iteration |
|
* is done the array is returned. |
|
* |
|
* This method is used internally by the semantics of the `for await ... of` |
|
* loop, and is not intended to be called directly. |
|
* |
|
* @category Iteration |
|
* @async |
|
*/ |
|
public async *[Symbol.asyncIterator](): AsyncIterableIterator< |
|
void | Promise<void> |
|
> { |
|
const results: unknown[] = []; |
|
try { |
|
for await (const fn of this.#queue) { |
|
const result = await fn(); |
|
results.push(result); |
|
yield result; |
|
} |
|
} finally { |
|
this.clear(); |
|
} |
|
return results; |
|
} |
|
|
|
/** |
|
* Synchronously disposes of the queue's resources and clears the queue. This |
|
* is a one-time operation, and once a queue has been disposed it cannot be |
|
* used again. This method is called automatically by the semantics of the |
|
* `using` statement from the Explicit Resource Management Proposal. |
|
* |
|
* @category Explicit Resource Mangement |
|
*/ |
|
public [Symbol.dispose](): void { |
|
if (!this.#disposed) { |
|
this.#disposed = true; |
|
this.clear(); |
|
} |
|
} |
|
|
|
/** |
|
* Asynchronously disposes of the queue's resources and clears the queue. |
|
* This is a one-time operation, and once a queue has been disposed it cannot |
|
* be used again. This method is called automatically by the semantics of the |
|
* `await using` statement from the Explicit Resource Management Proposal. |
|
* |
|
* @category Explicit Resource Mangement |
|
*/ |
|
public async [Symbol.asyncDispose](): Promise<void> { |
|
if (!this.#disposed) { |
|
this.#disposed = true; |
|
for await (const result of this) { |
|
await result; |
|
} // drain the queue |
|
} |
|
} |
|
|
|
/** @internal */ |
|
declare public readonly [Symbol.toStringTag]: "PLimit"; |
|
// #endregion Public |
|
|
|
// #region Private |
|
#activeCount = 0; |
|
#queue = new Queue<() => Promise<void>>(); |
|
#concurrency = navigator?.hardwareConcurrency ?? 4; |
|
#disposed = false; |
|
|
|
async #run< |
|
const T, |
|
const A extends readonly unknown[], |
|
const R extends (value: T | PromiseLike<T>) => void, |
|
>(fn: (...args: A) => T | PromiseLike<T>, resolve: R, args: A) { |
|
this.#activeCount++; |
|
const result = (async () => await fn(...args))(); |
|
resolve(result); |
|
try { |
|
await result; |
|
} finally { |
|
this.#next(); |
|
} |
|
} |
|
|
|
#enqueue< |
|
const T, |
|
const A extends readonly unknown[], |
|
const R extends (value: T | PromiseLike<T>) => void, |
|
>(fn: (...args: A) => T | PromiseLike<T>, resolve: R, args: A) { |
|
this.#queue.enqueue( |
|
AsyncResource.bind( |
|
this.#run.bind( |
|
this, |
|
// deno-lint-ignore no-explicit-any |
|
fn.bind(this) as any, |
|
resolve as (v: unknown) => void, |
|
args, |
|
), |
|
), |
|
); |
|
(async () => { |
|
await Promise.resolve(); // force microtask to ensure queue is processed |
|
if (this.activeCount < this.concurrency && this.#queue.size > 0) { |
|
this.#queue.dequeue()?.(); |
|
} |
|
})(); |
|
return this; |
|
} |
|
|
|
#next() { |
|
this.#activeCount--; |
|
if (this.#queue.size > 0) this.#queue.dequeue()?.(); |
|
} |
|
|
|
static { |
|
const value = "PLimit"; |
|
Object.defineProperty(this.prototype, Symbol.toStringTag, { |
|
configurable: true, |
|
writable: false, |
|
value, |
|
}); |
|
Object.defineProperty(this, "name", { configurable: true, value }); |
|
} |
|
// #endregion Private Methods |
|
|
|
// #region Static Methods |
|
/** |
|
* Creates a promise queue with concurrency limit. This method accepts an |
|
* iterable of promises or promise-returning (async) functions, creates a new |
|
* PLimit instance, wraps each item in the iterable with {@link PLimit.add}, |
|
* and finally returns the iterable wrapped with {@link Promise.all}. |
|
* |
|
* @see {@link PLimit.allSettled} for PLimit's version of `Promise.allSettled` |
|
* |
|
* This is a convenience method that is equivalent to the following: |
|
* ```ts |
|
* const limit = new PLimit(2); |
|
* const items = [ |
|
* limit.add(() => Promise.resolve(1)), |
|
* limit.add(() => Promise.resolve(2)), |
|
* // ... |
|
* ]; |
|
* const result = await Promise.all(items); |
|
* ``` |
|
* |
|
* Instead, you can now just write this: |
|
* ```ts |
|
* const result = await PLimit.all([ |
|
* () => Promise.resolve(1), |
|
* () => Promise.resolve(2), |
|
* // ... |
|
* ], 2); |
|
* ``` |
|
* |
|
* @param promises array of promises or async functions that return promises |
|
* @param [concurrency] maximum number of tasks to run concurrently (>= 1) |
|
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject |
|
*/ |
|
public static all<const T extends readonly unknown[] | []>( |
|
promises: T, |
|
concurrency?: number, |
|
): Promise< |
|
{ |
|
-readonly [P in keyof T]: Awaited< |
|
// deno-lint-ignore no-explicit-any |
|
T[P] extends (...args: any) => infer R ? R : T[P] |
|
>; |
|
} |
|
>; |
|
public static all<T>( |
|
promises: ReadonlyArray<(() => T | PromiseLike<T>) | PromiseLike<T>>, |
|
concurrency?: number, |
|
): Promise<Awaited<T>[]>; |
|
|
|
/** |
|
* Creates a promise queue with concurrency limit. This method accepts an |
|
* iterable of promises or promise-returning (async) functions, creates a new |
|
* PLimit instance, wraps each item in the iterable with {@link PLimit.add}, |
|
* and finally returns the iterable wrapped with {@link Promise.all}. |
|
* |
|
* @param promises iterable of promises or async promise-returning functions |
|
* @param [concurrency] maximum number of tasks to run concurrently (>= 1) |
|
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject |
|
*/ |
|
public static all<T>( |
|
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>, |
|
concurrency?: number, |
|
): Promise<Awaited<T>[]>; |
|
|
|
/** |
|
* @template {T} type of the resolved value of each promise |
|
* @param {Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>} promises |
|
* iterable of promises or async promise-returning functions |
|
* @param {number} [concurrency] maximum tasks to run concurrently (>= 1) |
|
* @returns {Promise<Awaited<T>[]>} a promise that resolves when all wrapped |
|
* promises have resolved, or rejects if any of the wrapped promises reject. |
|
*/ |
|
public static all<T>( |
|
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>, |
|
concurrency?: number, |
|
): Promise<Awaited<T>[]> { |
|
const limit = new PLimit(concurrency); |
|
return Promise.all(Array.from(promises, limit.add, limit)); |
|
} |
|
|
|
/** |
|
* Creates a promise queue with concurrency limit. This method accepts an |
|
* iterable of promises or promise-returning (async) functions, creates a new |
|
* PLimit instance, wraps each item in the iterable with {@link PLimit.add}, |
|
* and finally returns the iterable wrapped with {@link Promise.allSettled}. |
|
* |
|
* @see {@link PLimit.all} for PLimit's version of `Promise.all` |
|
* |
|
* This is a convenience method that is equivalent to the following: |
|
* ```ts |
|
* const limit = new PLimit(2); |
|
* const items = [ |
|
* limit.add(() => Promise.resolve(1)), |
|
* limit.add(() => Promise.resolve(2)), |
|
* // ... |
|
* ]; |
|
* const result = await Promise.allSettled(items); |
|
* ``` |
|
* |
|
* Instead, you can now just write this: |
|
* ```ts |
|
* const result = await PLimit.allSettled([ |
|
* () => Promise.resolve(1), |
|
* () => Promise.resolve(2), |
|
* // ... |
|
* ], 2); |
|
* ``` |
|
* |
|
* @param promises array of promises or async functions that return promises |
|
* @param concurrency The maximum number of tasks to run concurrently (>= 1) |
|
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject |
|
*/ |
|
public static allSettled<const T extends readonly unknown[] | []>( |
|
promises: T, |
|
concurrency?: number, |
|
): Promise< |
|
{ |
|
-readonly [P in keyof T]: PromiseSettledResult< |
|
// deno-lint-ignore no-explicit-any |
|
Awaited<T[P] extends (...args: any) => infer R ? R : T[P]> |
|
>; |
|
} |
|
>; |
|
public static allSettled<T>( |
|
promises: ReadonlyArray<(() => T | PromiseLike<T>) | PromiseLike<T>>, |
|
concurrency?: number, |
|
): Promise<PromiseSettledResult<Awaited<T>>[]>; |
|
|
|
/** |
|
* Creates a promise queue with concurrency limit. This method accepts an |
|
* iterable of promises or promise-returning (async) functions, creates a new |
|
* PLimit instance, wraps each item in the iterable with {@link PLimit.add}, |
|
* and finally returns the iterable wrapped with {@link Promise.allSettled}. |
|
* |
|
* @param promises iterable of promises or async promise-returning functions |
|
* @param [concurrency] maximum number of tasks to run concurrently (>= 1) |
|
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject |
|
*/ |
|
public static allSettled<T>( |
|
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>, |
|
concurrency?: number, |
|
): Promise<PromiseSettledResult<Awaited<T>>[]>; |
|
|
|
/** |
|
* @template {T} type of the resolved value of each promise |
|
* @param {Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>} promises iterable of promises or async promise-returning functions |
|
* @param {number} [concurrency] maximum number of tasks to run concurrently (>= 1) |
|
* @returns {Promise<PromiseSettledResult<Awaited<T>>[]>} a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject |
|
*/ |
|
public static allSettled<T>( |
|
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>, |
|
concurrency?: number, |
|
): Promise<PromiseSettledResult<Awaited<T>>[]> { |
|
const limit = new PLimit(concurrency); |
|
return Promise.allSettled(Array.from(promises, limit.add, limit)); |
|
} |
|
// #endregion Static Methods |
|
} |
|
|
|
/** |
|
* Creates a "limiter function" that can be used to limit the number of |
|
* concurrent executions of a given function. |
|
* |
|
* The limiter function accepts a function and any number of arguments to pass |
|
* to the function, and returns a promise that resolves when the function is |
|
* finished executing, with respect to the {@link concurrency} limit. The |
|
* promise rejects if the function throws an error or if the queue is ended |
|
* before the function has a chance to execute. |
|
* |
|
* > **Note¹**: If your use case requires more control over the queue, you can |
|
* create a new instance of {@link PLimit} and use {@link PLimit.add} instead. |
|
* |
|
* > **Note²**: this function is mainly here for convenience and to preserve |
|
* the behavior of the NPM package `p-limit` this module is based on. |
|
* |
|
* @param concurrency maximum number of tasks to run concurrently (>= 1) |
|
* @returns a "limiter function" that can be used to limit the {@link concurrency} of a given function's execution. |
|
*/ |
|
export function pLimit(concurrency?: number): { |
|
<const A extends readonly unknown[], T = unknown>( |
|
fn: (this: PLimit, ...args: A) => T | PromiseLike<T>, |
|
...args: A |
|
): Promise<T>; |
|
} { |
|
const p = new PLimit(concurrency); |
|
return bind(p.add, p) as typeof p.add; |
|
} |