Skip to content

Instantly share code, notes, and snippets.

@nberlette
Last active June 12, 2024 19:25
Show Gist options
  • Save nberlette/e51ce49dc356018cd25a5cdf4ad2cfd0 to your computer and use it in GitHub Desktop.
Save nberlette/e51ce49dc356018cd25a5cdf4ad2cfd0 to your computer and use it in GitHub Desktop.
`PLimit`: TypeScript re-imagining of p-limit and yocto-queue. Friendly with Deno, Bun, and Node.
dist
node_module
*.env*
*.log
.*.log
*.lock*
*-lock.*
.DS_Store
Thumbs.db
.SpotlightV100
*.pem
*.p12
.cache
.deno
.npm

PLimit: Promise Queue with Concurrency Control

This is a simple TypeScript implementation of a promise queue with fine-grained concurrency control. It was inspired by (and borrows heavily from) the p-limit package, with a few additional features.

Usage

PLimit

import { PLimit } from "https://gist.githubusercontent.com/nberlette/e51ce49dc356018cd25a5cdf4ad2cfd0/raw/p-limit.ts";

new PLimit(concurrency?: number)

const { limit } = new PLimit(/* concurrency */ 1);

const tasks = [
  () => new Promise((resolve) => setTimeout(resolve, 1000)),
  async () => await fetch("https://example.com").then((r) => r.text()),
  async () => {
    await new Promise((resolve) => setTimeout(resolve, 1000));
    return "done";
  },
];

const results = await Promise.all(tasks.map(limit));

PLimit.all(tasks: Promise[], concurrency?: number)

After finding that myself and many others use p-limit in conjunction with Promise.all, I decided to integrate the two together. PLimit.all is a drop-in replacement for Promise.all that takes an additional limit parameter.

const tasks = [
  () => Promise.resolve(1),
  () => Promise.resolve(2),
  () => Promise.resolve(3),
];
const results = await PLimit.all(tasks, /* concurrency */ 1);

Take a look at what the code above would look like if you were to use Promise.all and p-limit together:

const { limit } = new PLimit(/* concurrency */ 1);

const tasks = [
  limit(() => Promise.resolve(1)),
  limit(() => Promise.resolve(2)),
  limit(() => Promise.resolve(3)),
];

const results = await Promise.all(tasks);

PLimit.allSettled(tasks: Promise[], concurrency?: number)

PLimit.allSettled is a drop-in replacement for Promise.allSettled that takes an additional limit parameter.

const tasks = [
  () => Promise.resolve(1),
  () => Promise.reject(2),
  () => Promise.resolve(3),
];

const results = await PLimit.allSettled(tasks, /* concurrency */ 1);

// results = [
//   { status: "fulfilled", value: 1 },
//   { status: "rejected", reason: 2 },
//   { status: "fulfilled", value: 3 },
// ];

for-await-of

PLimit implements the AsyncIterable interface, so you can use it in a for-await-of loop.

const p = new PLimit(/* concurrency */ 1);

const tasks = [
  () => Promise.resolve(1),
  () => Promise.resolve(2),
  () => Promise.resolve(3),
].map(p.limit);

for await (const result of p) console.log(result);

await using

PLimit implements the AsyncDisposable interface from the TC39 Proposal for Explicit Resource Management, so you can use it with the new await using syntax in TypeScript v5.2+:

async function doSomething() {
  await using p = new PLimit(/* concurrency */ 1);

  const tasks = [
    () => Promise.resolve(1),
    // ... some resource-intensive tasks here ...
    () => Promise.resolve(2),
  ].map(p.add);

  for await (const result of p) console.log(result);

  // `p` is automatically disposed here, even if an error is thrown or
  // its queue is not fully drained by the time it goes out of scope.
}

using

PLimit implements the Disposable interface from the TC39 Proposal for Explicit Resource Management, so you can use it with the new using syntax in TypeScript v5.2+:

function doSomethingSyncIsh() {
  using p = new PLimit(/* concurrency */ 1);

  const tasks = [
    () => Promise.resolve(1),
    // ... some resource-intensive tasks here ...
    () => Promise.resolve(2),
  ].map(p.add);

  for (const result of p) result.then(console.log);

  // `p` is automatically disposed here, even if an error is thrown or
  // its queue is not fully drained by the time it goes out of scope.
}

Queue

This is simple queue implementation that can be used to enqueue functions that return promises. It is used internally by PLimit, but it can also be used on its own. It was inspired by the yocto-queue package.

import { Queue } from "https://gist.githubusercontent.com/nberlette/e51ce49dc356018cd25a5cdf4ad2cfd0/raw/queue.ts";

new Queue()

const queue = new Queue();

queue.enqueue(() => Promise.resolve(1));
queue.enqueue(() => Promise.resolve(2));
queue.enqueue(() => Promise.resolve(3));

for (const result of queue) {
  console.log(await result);
}

Compatibility

This was developed with Deno in mind, but it should work just fine in Node.js and Bun as well. Browser usage is currently not supported due to the dependency on async_hooks.


MIT © Nicholas Berlette and Sindre Sorhus. All rights reserved.

{
"name": "@nick/p-limit",
"version": "0.1.0",
"exports": {
".": "./mod.ts",
"./node": "./p-limit.node.ts",
"./queue": "./queue.ts",
"./p-limit": "./p-limit.ts"
},
"publish": {
"include": [
"**/*.ts",
"**/*.json",
"**/*.md",
"LICENSE"
],
"exclude": [
"**/test*",
"**/dist",
"**/*.test.*"
]
},
"lock": false
}
const $bind = Function.prototype.bind;
export function bind<
T,
const A extends readonly unknown[],
const B extends readonly unknown[],
R = unknown,
U extends Record<string | symbol, any> = Record<never, never>,
>(
target: ((...args: [...A, ...B]) => R) & U,
thisArg: T,
...args: A
):
& ([T & {}] extends [never] ? (...args: B) => R : (this: T, ...args: B) => R)
& ([(keyof U) & {}] extends [never] ? unknown : { [K in keyof U]: U[K] }) {
const props = Object.getOwnPropertyDescriptors(target);
const value = target.name;
const fn = $bind.call(target, thisArg, ...args);
Object.defineProperty(fn, "name", { value, configurable: true });
for (const key in props) {
// skip the constructor and name properties
if (key === "constructor" || key === "name") continue;
const descriptor = props[key];
const { value, get, set } = descriptor;
if (value && typeof value === "function") {
// bind static methods to the original thisArg
// - note: this uses the new bind method, not the original, meaning it
// may recursively bind any nested methods all the way down the chain
descriptor.value = bind(value, target);
} else {
// bind static getters/setters to the original thisArg
// - note these use the original Function.prototype.bind since they don't
// require any special logic here.
if (get && typeof get === "function") {
descriptor.get = $bind.call(get, target);
}
if (set && typeof set === "function") {
descriptor.set = $bind.call(set, target);
}
}
Object.defineProperty(fn, key, descriptor);
}
return fn;
}
The MIT License (MIT)
Copyright © 2023 Nicholas Berlette (https://github.com/nberlette)
Copyright © 2023 Sindre Sorhus (https://github.com/sindresorhus)
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import { PLimit } from "./p-limit.node.ts";
import assert from "node:assert";
import { AsyncLocalStorage } from "node:async_hooks";
import { pLimitTest, scenarios } from "./p-limit.test.ts";
const { test } = Deno;
if (import.meta.main) {
test("PLimit scenarios perform consistent with expectations", async (t) => {
for (let i = 0; i < scenarios.length; i++) {
const scenario = scenarios[i];
const { concurrency, length, delay, expected } = scenario;
await t.step(
`runs: ${length}, concurrency: ${concurrency}, delay: ${delay}, expected: ${expected} ms`,
async () => await pLimitTest({ concurrency, length, delay, expected }),
);
}
});
/**
* This test was added in p-limit 5.0.0 to ensure that the async execution
* context is properly propagated.
*
* It requires Deno v1.30.0+ or Node.js v18.0.0+.
*/
test("PLimit propagates async execution context properly", async () => {
const concurrency = 2;
const limit = new PLimit(concurrency);
const store = new AsyncLocalStorage<{ id: number }>();
const checkId = async (id: number) => {
await Promise.resolve();
assert.equal(id, store.getStore()?.id);
};
const startContext = async (id: number) =>
await store.run({ id }, () => limit.add(checkId, id));
await Promise.all(
Array.from({ length: 100 }, (_, id) => startContext(id)),
);
});
}
// deno-lint-ignore-file require-await
import { AsyncResource } from "node:async_hooks";
import { Queue } from "./queue.ts";
import { bind } from "./helpers.ts";
export class PLimit
implements
Iterable<void | Promise<void>>,
AsyncIterable<void | Promise<void>>,
Disposable,
AsyncDisposable {
constructor(concurrency?: number) {
if (concurrency) this.concurrency = concurrency;
}
// #region Public
/**
* The maximum number of promises that can be pending at the same time.
* Must be a non-zero positive integer >= 1. Defaults to the number of
* logical CPUs available on the current system.
*
* @default {navigator.hardwareConcurrency}
*/
public get concurrency(): number {
return this.#concurrency;
}
public set concurrency(value: number) {
if (
typeof value !== "number" || isNaN(value) ||
(value < 1) || !Number.isSafeInteger(value)
) {
throw new TypeError(
`Expected 'value' to be a positive non-zero finite integer. Received: '${value}' (${typeof value})`,
);
}
this.#concurrency = value;
}
/** The number of promises currently active (executing). */
public get activeCount(): number {
return this.#activeCount;
}
/** The number of promises currently pending (waiting to execute). */
public get pendingCount(): number {
return this.#queue.size;
}
/** Clears the queue and resets the {@link activeCount}. */
public clear(): void {
this.#activeCount = 0;
this.#queue.clear();
}
/**
* Adds a promise-returning or async function to the queue. Returns a promise
* that resolves when the function is finished executing, with respect to the
* concurrency limit. The promise rejects if the function throws an error or
* if the queue is ended before the function has a chance to execute.
*
* @param fn promise-returning or async function
* @param args arguments to pass to the function
* @returns a promise that resolves when the function is finished executing
*
* @example
* ```ts
* const list = new PLimit(1);
* const items = [];
* const runner = async (i: number) => {
* await new Promise((resolve) => setTimeout(resolve, 1000));
* items.push(
* `| #${(i+"").padEnd(3)} | Active: ${
* (list.activeCount + "").padEnd(3)
* } | Pending: ${(list.pendingCount+"").padEnd(3)} |`,
* );
* };
*
* await Promise.all([
* list.add(runner, 1),
* list.add(runner, 2),
* list.add(runner, 3),
* list.add(() => Promise.resolve("all done!")),
* ]);
*
* console.log(items.join("\n"));
* ```
*/
public add<
const A extends readonly unknown[],
const T = unknown,
>(
fn: PromiseLike<T> | ((...args: A) => T | PromiseLike<T>),
...args: A
): Promise<T> {
const { promise, resolve } = Promise.withResolvers<T>();
fn = typeof fn === "function" ? fn : () => fn as T | PromiseLike<T>;
this.#enqueue(fn, resolve, args);
return promise;
}
/**
* This method is an alias for {@link PLimit.add}.
*
* @param fn promise-returning or async function
* @param args arguments to pass to the function
* @returns a promise that resolves when the function is finished executing
*/
public limit<
const A extends readonly unknown[],
const T = unknown,
>(
fn: PromiseLike<T> | ((...args: A) => T | PromiseLike<T>),
...args: A
): Promise<T> {
return this.add(fn, ...args);
}
/**
* Drains the queue and returns a promise that resolves when all promises
* have finished executing. The promise rejects immediately if any of the
* promises reject. The queue is cleared after this method is called. The
* results are accumulated into an array and returned once the queue is
* drained. The returned array's elements are guaranteed to be in the same
* order as the functions that were added to the queue.
*
* @returns a promise that resolves to an aggregate array of the results that
* were yielded by each of the functions in the queue.
*/
public async drain<T = unknown>(): Promise<readonly T[]> {
const results: unknown[] = [];
try {
for await (const fn of this.#queue) {
const result = await fn();
results.push(result);
}
} finally {
this.clear();
}
return results as readonly T[];
}
/**
* Synchronously iterates over the queue, invoking and yielding the results
* of each function in the queue. The queue is cleared once the iteration is
* complete, regardless of whether the iteration is interrupted by an error.
*
* Unlike its asynchronous counterpart, this method does not accumulate the
* yielded results into an aggregate array. It simply yields the results as
* they are produced.
*
* This method is used internally by the semantics of the `for ... of` loop,
* and is not intended to be called directly.
*
* @category Iteration
*/
public *[Symbol.iterator](): IterableIterator<Promise<void>> {
try {
for (const fn of this.#queue) yield fn();
} finally {
this.clear();
}
}
/**
* Asynchronously iterates over the queue, invoking and yielding the results
* of each function in the queue. The queue is cleared once the iteration is
* complete, regardless of whether the iteration is interrupted by an error.
* Results are pushed to an array as they are yielded, and once the iteration
* is done the array is returned.
*
* This method is used internally by the semantics of the `for await ... of`
* loop, and is not intended to be called directly.
*
* @category Iteration
* @async
*/
public async *[Symbol.asyncIterator](): AsyncIterableIterator<
void | Promise<void>
> {
const results: unknown[] = [];
try {
for await (const fn of this.#queue) {
const result = await fn();
results.push(result);
yield result;
}
} finally {
this.clear();
}
return results;
}
/**
* Synchronously disposes of the queue's resources and clears the queue. This
* is a one-time operation, and once a queue has been disposed it cannot be
* used again. This method is called automatically by the semantics of the
* `using` statement from the Explicit Resource Management Proposal.
*
* @category Explicit Resource Mangement
*/
public [Symbol.dispose](): void {
if (!this.#disposed) {
this.#disposed = true;
this.clear();
}
}
/**
* Asynchronously disposes of the queue's resources and clears the queue.
* This is a one-time operation, and once a queue has been disposed it cannot
* be used again. This method is called automatically by the semantics of the
* `await using` statement from the Explicit Resource Management Proposal.
*
* @category Explicit Resource Mangement
*/
public async [Symbol.asyncDispose](): Promise<void> {
if (!this.#disposed) {
this.#disposed = true;
for await (const result of this) {
await result;
} // drain the queue
}
}
/** @internal */
declare public readonly [Symbol.toStringTag]: "PLimit";
// #endregion Public
// #region Private
#activeCount = 0;
#queue = new Queue<() => Promise<void>>();
#concurrency = navigator?.hardwareConcurrency ?? 4;
#disposed = false;
async #run<
const T,
const A extends readonly unknown[],
const R extends (value: T | PromiseLike<T>) => void,
>(fn: (...args: A) => T | PromiseLike<T>, resolve: R, args: A) {
this.#activeCount++;
const result = (async () => await fn(...args))();
resolve(result);
try {
await result;
} finally {
this.#next();
}
}
#enqueue<
const T,
const A extends readonly unknown[],
const R extends (value: T | PromiseLike<T>) => void,
>(fn: (...args: A) => T | PromiseLike<T>, resolve: R, args: A) {
this.#queue.enqueue(
AsyncResource.bind(
this.#run.bind(
this,
// deno-lint-ignore no-explicit-any
fn.bind(this) as any,
resolve as (v: unknown) => void,
args,
),
),
);
(async () => {
await Promise.resolve(); // force microtask to ensure queue is processed
if (this.activeCount < this.concurrency && this.#queue.size > 0) {
this.#queue.dequeue()?.();
}
})();
return this;
}
#next() {
this.#activeCount--;
if (this.#queue.size > 0) this.#queue.dequeue()?.();
}
static {
const value = "PLimit";
Object.defineProperty(this.prototype, Symbol.toStringTag, {
configurable: true,
writable: false,
value,
});
Object.defineProperty(this, "name", { configurable: true, value });
}
// #endregion Private Methods
// #region Static Methods
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.all}.
*
* @see {@link PLimit.allSettled} for PLimit's version of `Promise.allSettled`
*
* This is a convenience method that is equivalent to the following:
* ```ts
* const limit = new PLimit(2);
* const items = [
* limit.add(() => Promise.resolve(1)),
* limit.add(() => Promise.resolve(2)),
* // ...
* ];
* const result = await Promise.all(items);
* ```
*
* Instead, you can now just write this:
* ```ts
* const result = await PLimit.all([
* () => Promise.resolve(1),
* () => Promise.resolve(2),
* // ...
* ], 2);
* ```
*
* @param promises array of promises or async functions that return promises
* @param [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static all<const T extends readonly unknown[] | []>(
promises: T,
concurrency?: number,
): Promise<
{
-readonly [P in keyof T]: Awaited<
// deno-lint-ignore no-explicit-any
T[P] extends (...args: any) => infer R ? R : T[P]
>;
}
>;
public static all<T>(
promises: ReadonlyArray<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<Awaited<T>[]>;
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.all}.
*
* @param promises iterable of promises or async promise-returning functions
* @param [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static all<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<Awaited<T>[]>;
/**
* @template {T} type of the resolved value of each promise
* @param {Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>} promises
* iterable of promises or async promise-returning functions
* @param {number} [concurrency] maximum tasks to run concurrently (>= 1)
* @returns {Promise<Awaited<T>[]>} a promise that resolves when all wrapped
* promises have resolved, or rejects if any of the wrapped promises reject.
*/
public static all<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<Awaited<T>[]> {
const limit = new PLimit(concurrency);
return Promise.all(Array.from(promises, limit.add, limit));
}
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.allSettled}.
*
* @see {@link PLimit.all} for PLimit's version of `Promise.all`
*
* This is a convenience method that is equivalent to the following:
* ```ts
* const limit = new PLimit(2);
* const items = [
* limit.add(() => Promise.resolve(1)),
* limit.add(() => Promise.resolve(2)),
* // ...
* ];
* const result = await Promise.allSettled(items);
* ```
*
* Instead, you can now just write this:
* ```ts
* const result = await PLimit.allSettled([
* () => Promise.resolve(1),
* () => Promise.resolve(2),
* // ...
* ], 2);
* ```
*
* @param promises array of promises or async functions that return promises
* @param concurrency The maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static allSettled<const T extends readonly unknown[] | []>(
promises: T,
concurrency?: number,
): Promise<
{
-readonly [P in keyof T]: PromiseSettledResult<
// deno-lint-ignore no-explicit-any
Awaited<T[P] extends (...args: any) => infer R ? R : T[P]>
>;
}
>;
public static allSettled<T>(
promises: ReadonlyArray<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<PromiseSettledResult<Awaited<T>>[]>;
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.allSettled}.
*
* @param promises iterable of promises or async promise-returning functions
* @param [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static allSettled<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<PromiseSettledResult<Awaited<T>>[]>;
/**
* @template {T} type of the resolved value of each promise
* @param {Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>} promises iterable of promises or async promise-returning functions
* @param {number} [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns {Promise<PromiseSettledResult<Awaited<T>>[]>} a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static allSettled<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<PromiseSettledResult<Awaited<T>>[]> {
const limit = new PLimit(concurrency);
return Promise.allSettled(Array.from(promises, limit.add, limit));
}
// #endregion Static Methods
}
/**
* Creates a "limiter function" that can be used to limit the number of
* concurrent executions of a given function.
*
* The limiter function accepts a function and any number of arguments to pass
* to the function, and returns a promise that resolves when the function is
* finished executing, with respect to the {@link concurrency} limit. The
* promise rejects if the function throws an error or if the queue is ended
* before the function has a chance to execute.
*
* > **Note¹**: If your use case requires more control over the queue, you can
* create a new instance of {@link PLimit} and use {@link PLimit.add} instead.
*
* > **Note²**: this function is mainly here for convenience and to preserve
* the behavior of the NPM package `p-limit` this module is based on.
*
* @param concurrency maximum number of tasks to run concurrently (>= 1)
* @returns a "limiter function" that can be used to limit the {@link concurrency} of a given function's execution.
*/
export function pLimit(concurrency?: number): {
<const A extends readonly unknown[], T = unknown>(
fn: (this: PLimit, ...args: A) => T | PromiseLike<T>,
...args: A
): Promise<T>;
} {
const p = new PLimit(concurrency);
return bind(p.add, p) as typeof p.add;
}
import { PLimit } from "./p-limit.ts";
import assert from "node:assert";
export async function pLimitTest({
PLimit: P = PLimit,
expected = -1,
concurrency = 1,
length = 100,
delay = 100 as (() => number) | number,
} = {}): Promise<void> {
if (typeof delay === "number" && expected === -1) {
expected = Math.ceil((length / concurrency) * delay);
}
let active = 0;
const tasks = Array.from({ length }, (_, i) => async () => {
active++;
await new Promise((resolve) =>
setTimeout(resolve, typeof delay === "function" ? delay() : delay)
);
assert(
concurrency >= active,
`concurrency >= active (${concurrency} >= ${active})`,
);
active--;
return `task${i + 1}`;
});
performance.mark(`tasks:start`);
const results = await P.all(tasks, concurrency);
performance.mark(`tasks:end`)
const measure = performance.measure("elapsed", "tasks:start", "tasks:end");
const elapsed = measure.duration;
assert.strictEqual(results.length, length, `results.length === ${length}`);
assert(
results.every((result, i) => result === `task${i + 1}`),
"all tasks performed as expected",
);
if (typeof delay === "number" && expected > 0) {
const actual = Math.floor(elapsed);
assert(
actual >= expected && actual < expected * (4 / 3),
`actual time (${actual} ms) >= expected time (${expected} ms ± 33.3%)`,
);
}
}
/**
* Tests the PLimit module's behavior against a set of scenarios, ensuring it
* performs as expected across a variety of concurrency, length, and delay
* values. Since the three variables are interdependent, the expected execution
* time can be calculated as follows:
*
* expected = Math.floor((length / concurrency) * delay)
*
* For example, if concurrency is 2, length is 10, and delay is 100, that means
* that a maximum of 2 tasks can be run simultaneously, with a delay of 100ms
* between each task. For 10 tasks, expect `(10 / 2) * 100 = 500ms` total time.
*/
export const scenarios = [
/** Baseline: 1 task @ 1 concurrency @ 100ms delay = 100ms */
{ concurrency: 1, length: 1, delay: 100, expected: 100 },
/** 10 tasks / 1 concurrency x 100ms delay = 1s */
{ concurrency: 1, length: 10, delay: 100, expected: 1000 },
/** 10 tasks / 2 concurrency x 100ms delay = 500ms */
{ concurrency: 2, length: 10, delay: 100, expected: 500 },
/** 10 tasks / 5 concurrency x 100ms delay = 200ms */
{ concurrency: 5, length: 10, delay: 100, expected: 200 },
/** 10 tasks / 5 concurrency x 250ms delay = 500ms */
{ concurrency: 5, length: 10, delay: 250, expected: 500 },
/** 10 tasks / 5 concurrency x 500ms delay = 1s */
{ concurrency: 5, length: 10, delay: 500, expected: 1000 },
/** 50 tasks / 1 concurrency x 10ms delay = 500ms */
{ concurrency: 1, length: 50, delay: 10, expected: 500 },
/** 50 tasks / 1 concurrency x 100ms delay = 5s */
{ concurrency: 1, length: 50, delay: 100, expected: 5000 },
/** 50 tasks / 5 concurrency x 100ms delay = 1s */
{ concurrency: 5, length: 50, delay: 100, expected: 1000 },
/** 50 tasks / 2 concurrency x 100ms delay = 2.5s */
{ concurrency: 2, length: 50, delay: 100, expected: 2500 },
/** 50 tasks / 5 concurrency x 100ms delay = 1s */
{ concurrency: 5, length: 50, delay: 100, expected: 1000 },
/** 50 tasks / 5 concurrency x 250ms delay = 2.5s */
{ concurrency: 5, length: 50, delay: 250, expected: 2500 },
/** 100 tasks / 10 concurrency x 10ms delay = 100ms */
{ concurrency: 10, length: 100, delay: 10, expected: 100 },
];
const { test } = Deno;
if (import.meta.main) {
test("PLimit scenarios perform consistent with expectations", async (t) => {
for (let i = 0; i < scenarios.length; i++) {
const scenario = scenarios[i];
const { concurrency, length, delay, expected } = scenario;
await t.step(
`runs: ${length}, concurrency: ${concurrency}, delay: ${delay}, expected: ${expected} ms`,
async () => await pLimitTest({ concurrency, length, delay, expected }),
);
}
});
}
// deno-lint-ignore-file require-await
import { Queue } from "./queue.ts";
import { bind } from "./helpers.ts";
export class PLimit
implements
Iterable<void | Promise<void>>,
AsyncIterable<void | Promise<void>>,
Disposable,
AsyncDisposable {
constructor(concurrency?: number) {
if (concurrency) this.concurrency = concurrency;
}
// #region Public
/**
* The maximum number of promises that can be pending at the same time.
* Must be a non-zero positive integer >= 1. Defaults to the number of
* logical CPUs available on the current system.
*
* @default {navigator.hardwareConcurrency}
*/
public get concurrency(): number {
return this.#concurrency;
}
public set concurrency(value: number) {
if (
typeof value !== "number" || isNaN(value) ||
(value < 1) || !Number.isSafeInteger(value)
) {
throw new TypeError(
`Expected 'value' to be a positive non-zero finite integer. Received: '${value}' (${typeof value})`,
);
}
this.#concurrency = value;
}
/** The number of promises currently active (executing). */
public get activeCount(): number {
return this.#activeCount;
}
/** The number of promises currently pending (waiting to execute). */
public get pendingCount(): number {
return this.#queue.size;
}
/** Clears the queue and resets the {@link activeCount}. */
public clear(): void {
this.#activeCount = 0;
this.#queue.clear();
}
/**
* Adds a promise-returning or async function to the queue. Returns a promise
* that resolves when the function is finished executing, with respect to the
* concurrency limit. The promise rejects if the function throws an error or
* if the queue is ended before the function has a chance to execute.
*
* @param fn promise-returning or async function
* @param args arguments to pass to the function
* @returns a promise that resolves when the function is finished executing
*
* @example
* ```ts
* const list = new PLimit(1);
* const items = [];
* const runner = async (i: number) => {
* await new Promise((resolve) => setTimeout(resolve, 1000));
* items.push(
* `| #${(i+"").padEnd(3)} | Active: ${
* (list.activeCount + "").padEnd(3)
* } | Pending: ${(list.pendingCount+"").padEnd(3)} |`,
* );
* };
*
* await Promise.all([
* list.add(runner, 1),
* list.add(runner, 2),
* list.add(runner, 3),
* list.add(() => Promise.resolve("all done!")),
* ]);
*
* console.log(items.join("\n"));
* ```
*/
public add<
const A extends readonly unknown[],
const T = unknown,
>(
fn: PromiseLike<T> | ((...args: A) => T | PromiseLike<T>),
...args: A
): Promise<T> {
const { promise, resolve } = Promise.withResolvers<T>();
fn = typeof fn === "function" ? fn : () => fn as T | PromiseLike<T>;
this.#enqueue(fn, resolve, args);
return promise;
}
/**
* This method is an alias for {@link PLimit.add}.
*
* @param fn promise-returning or async function
* @param args arguments to pass to the function
* @returns a promise that resolves when the function is finished executing
*/
public limit<
const A extends readonly unknown[],
const T = unknown,
>(
fn: PromiseLike<T> | ((...args: A) => T | PromiseLike<T>),
...args: A
): Promise<T> {
return this.add(fn, ...args);
}
/**
* Drains the queue and returns a promise that resolves when all promises
* have finished executing. The promise rejects immediately if any of the
* promises reject. The queue is cleared after this method is called. The
* results are accumulated into an array and returned once the queue is
* drained. The returned array's elements are guaranteed to be in the same
* order as the functions that were added to the queue.
*
* @returns a promise that resolves to an aggregate array of the results that
* were yielded by each of the functions in the queue.
*/
public async drain<T = unknown>(): Promise<readonly T[]> {
const results: unknown[] = [];
try {
for await (const fn of this.#queue) {
const result = await fn();
results.push(result);
}
} finally {
this.clear();
}
return results as readonly T[];
}
/**
* Synchronously iterates over the queue, invoking and yielding the results
* of each function in the queue. The queue is cleared once the iteration is
* complete, regardless of whether the iteration is interrupted by an error.
*
* Unlike its asynchronous counterpart, this method does not accumulate the
* yielded results into an aggregate array. It simply yields the results as
* they are produced.
*
* This method is used internally by the semantics of the `for ... of` loop,
* and is not intended to be called directly.
*
* @category Iteration
*/
public *[Symbol.iterator](): IterableIterator<Promise<void>> {
try {
for (const fn of this.#queue) yield fn();
} finally {
this.clear();
}
}
/**
* Asynchronously iterates over the queue, invoking and yielding the results
* of each function in the queue. The queue is cleared once the iteration is
* complete, regardless of whether the iteration is interrupted by an error.
* Results are pushed to an array as they are yielded, and once the iteration
* is done the array is returned.
*
* This method is used internally by the semantics of the `for await ... of`
* loop, and is not intended to be called directly.
*
* @category Iteration
* @async
*/
public async *[Symbol.asyncIterator](): AsyncIterableIterator<
void | Promise<void>
> {
const results: unknown[] = [];
try {
for await (const fn of this.#queue) {
const result = await fn();
results.push(result);
yield result;
}
} finally {
this.clear();
}
return results;
}
/**
* Synchronously disposes of the queue's resources and clears the queue. This
* is a one-time operation, and once a queue has been disposed it cannot be
* used again. This method is called automatically by the semantics of the
* `using` statement from the Explicit Resource Management Proposal.
*
* @category Explicit Resource Mangement
*/
public [Symbol.dispose](): void {
if (!this.#disposed) {
this.#disposed = true;
this.clear();
}
}
/**
* Asynchronously disposes of the queue's resources and clears the queue.
* This is a one-time operation, and once a queue has been disposed it cannot
* be used again. This method is called automatically by the semantics of the
* `await using` statement from the Explicit Resource Management Proposal.
*
* @category Explicit Resource Mangement
*/
public async [Symbol.asyncDispose](): Promise<void> {
if (!this.#disposed) {
this.#disposed = true;
for await (const result of this) {
await result;
} // drain the queue
}
}
/** @internal */
declare public readonly [Symbol.toStringTag]: "PLimit";
// #endregion Public
// #region Private
#activeCount = 0;
#queue = new Queue<() => Promise<void>>();
#concurrency = navigator?.hardwareConcurrency ?? 4;
#disposed = false;
async #run<
const T,
const A extends readonly unknown[],
const R extends (value: T | PromiseLike<T>) => void,
>(fn: (...args: A) => T | PromiseLike<T>, resolve: R, args: A) {
this.#activeCount++;
const result = (async () => await fn(...args))();
resolve(result);
try {
await result;
} finally {
this.#next();
}
}
#enqueue<
const T,
const A extends readonly unknown[],
const R extends (value: T | PromiseLike<T>) => void,
>(fn: (...args: A) => T | PromiseLike<T>, resolve: R, args: A) {
this.#queue.enqueue(
this.#run.bind(
this,
// deno-lint-ignore no-explicit-any
fn.bind(this) as any,
resolve as (v: unknown) => void,
args,
),
);
(async () => {
// force microtask to ensure queue is processed
await Promise.resolve();
if (this.activeCount < this.concurrency && this.#queue.size > 0) {
this.#queue.dequeue()?.();
}
})();
return this;
}
#next() {
this.#activeCount--;
if (this.#queue.size > 0) this.#queue.dequeue()?.();
}
static {
const value = "PLimit";
Object.defineProperty(this.prototype, Symbol.toStringTag, {
configurable: true,
writable: false,
value,
});
Object.defineProperty(this, "name", { configurable: true, value });
}
// #endregion Private Methods
// #region Static Methods
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.all}.
*
* @see {@link PLimit.allSettled} for PLimit's version of `Promise.allSettled`
*
* This is a convenience method that is equivalent to the following:
* ```ts
* const limit = new PLimit(2);
* const items = [
* limit.add(() => Promise.resolve(1)),
* limit.add(() => Promise.resolve(2)),
* // ...
* ];
* const result = await Promise.all(items);
* ```
*
* Instead, you can now just write this:
* ```ts
* const result = await PLimit.all([
* () => Promise.resolve(1),
* () => Promise.resolve(2),
* // ...
* ], 2);
* ```
*
* @param promises array of promises or async functions that return promises
* @param [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static all<const T extends readonly unknown[] | []>(
promises: T,
concurrency?: number,
): Promise<
{
-readonly [P in keyof T]: Awaited<
// deno-lint-ignore no-explicit-any
T[P] extends (...args: any) => infer R ? R : T[P]
>;
}
>;
public static all<T>(
promises: ReadonlyArray<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<Awaited<T>[]>;
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.all}.
*
* @param promises iterable of promises or async promise-returning functions
* @param [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static all<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<Awaited<T>[]>;
/**
* @template {T} type of the resolved value of each promise
* @param {Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>} promises
* iterable of promises or async promise-returning functions
* @param {number} [concurrency] maximum tasks to run concurrently (>= 1)
* @returns {Promise<Awaited<T>[]>} a promise that resolves when all wrapped
* promises have resolved, or rejects if any of the wrapped promises reject.
*/
public static all<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<Awaited<T>[]> {
const limit = new PLimit(concurrency);
return Promise.all(Array.from(promises, limit.add, limit));
}
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.allSettled}.
*
* @see {@link PLimit.all} for PLimit's version of `Promise.all`
*
* This is a convenience method that is equivalent to the following:
* ```ts
* const limit = new PLimit(2);
* const items = [
* limit.add(() => Promise.resolve(1)),
* limit.add(() => Promise.resolve(2)),
* // ...
* ];
* const result = await Promise.allSettled(items);
* ```
*
* Instead, you can now just write this:
* ```ts
* const result = await PLimit.allSettled([
* () => Promise.resolve(1),
* () => Promise.resolve(2),
* // ...
* ], 2);
* ```
*
* @param promises array of promises or async functions that return promises
* @param concurrency The maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static allSettled<const T extends readonly unknown[] | []>(
promises: T,
concurrency?: number,
): Promise<
{
-readonly [P in keyof T]: PromiseSettledResult<
// deno-lint-ignore no-explicit-any
Awaited<T[P] extends (...args: any) => infer R ? R : T[P]>
>;
}
>;
public static allSettled<T>(
promises: ReadonlyArray<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<PromiseSettledResult<Awaited<T>>[]>;
/**
* Creates a promise queue with concurrency limit. This method accepts an
* iterable of promises or promise-returning (async) functions, creates a new
* PLimit instance, wraps each item in the iterable with {@link PLimit.add},
* and finally returns the iterable wrapped with {@link Promise.allSettled}.
*
* @param promises iterable of promises or async promise-returning functions
* @param [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static allSettled<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<PromiseSettledResult<Awaited<T>>[]>;
/**
* @template {T} type of the resolved value of each promise
* @param {Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>} promises iterable of promises or async promise-returning functions
* @param {number} [concurrency] maximum number of tasks to run concurrently (>= 1)
* @returns {Promise<PromiseSettledResult<Awaited<T>>[]>} a promise that resolves when all wrapped promises have resolved, or rejects if any of the wrapped promises reject
*/
public static allSettled<T>(
promises: Iterable<(() => T | PromiseLike<T>) | PromiseLike<T>>,
concurrency?: number,
): Promise<PromiseSettledResult<Awaited<T>>[]> {
const limit = new PLimit(concurrency);
return Promise.allSettled(Array.from(promises, limit.add, limit));
}
// #endregion Static Methods
}
/**
* Creates a "limiter function" that can be used to limit the number of
* concurrent executions of a given function.
*
* The limiter function accepts a function and any number of arguments to pass
* to the function, and returns a promise that resolves when the function is
* finished executing, with respect to the {@link concurrency} limit. The
* promise rejects if the function throws an error or if the queue is ended
* before the function has a chance to execute.
*
* > **Note¹**: If your use case requires more control over the queue, you can
* create a new instance of {@link PLimit} and use {@link PLimit.add} instead.
*
* > **Note²**: this function is mainly here for convenience and to preserve
* the behavior of the NPM package `p-limit` this module is based on.
*
* @param concurrency maximum number of tasks to run concurrently (>= 1)
* @returns a "limiter function" that can be used to limit the {@link concurrency} of a given function's execution.
*/
export function pLimit(concurrency?: number): {
<const A extends readonly unknown[], T = unknown>(
fn: (this: PLimit, ...args: A) => T | PromiseLike<T>,
...args: A
): Promise<T>;
} {
const p = new PLimit(concurrency);
return bind(p.add, p) as typeof p.add;
}
class Node<T> {
constructor(
public value: T,
public next: Node<T> | undefined = undefined!,
) {}
}
/**
* Lightweight queue implementation.
*
* @author Nicholas Berlette <https://github.com/nberlette>
* @see https://github.com/sindresorhus/yocto-queue
*/
export class Queue<T> {
#head: Node<T> | undefined = undefined;
#tail: Node<T> | undefined = undefined;
#size = 0;
public get size(): number {
return this.#size;
}
public enqueue(value: T): this {
const node = new Node(value);
if (this.#head) {
this.#tail!.next = this.#tail = node;
} else {
this.#head = this.#tail = node;
}
this.#size++;
return this;
}
public dequeue(): T | undefined {
const current = this.#head;
if (current) {
this.#head = this.#head?.next;
this.#size--;
return current.value;
}
}
public clear(): void {
this.#head = this.#tail = undefined;
this.#size = 0;
}
public *[Symbol.iterator](): IterableIterator<T> {
let current = this.#head;
while (current) {
yield current.value;
current = current.next;
}
}
public toArray(): T[] {
return [...this];
}
public toJSON(): T[] {
return this.toArray();
}
public toString(): string {
return this.toArray().toString();
}
}
export default Queue;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment