Skip to content

Instantly share code, notes, and snippets.

@intrnl
Last active Dec 19, 2020
Embed
What would you like to do?
// class Queue<V = any>
// function createDeferred<V> (): Deferred<V>
// function sleep (ms: number): Promise<void>
export class PromiseQueue {
queue = new Queue<Task<any>>();
options: PromiseQueueOptions;
ongoing = 0;
constructor (opts: PromiseQueueOptions = {}) {
this.options = {
paused: false,
concurrency: 3,
concurrencyTimeout: 0,
retry: 3,
retryTimeout: 250,
...opts,
};
}
get pending () { return this.queue.size; }
add<V> (generator: () => Promisable<V>): Promise<V> {
let task: Task<V> = { ...createDeferred<V>(), generator };
this.queue.enqueue(task);
this._next();
return task.promise;
}
clear (): void {
this.queue.clear();
}
pause (): void {
this.options.paused = true;
}
resume (): void {
this.options.paused = false;
let start = this.options.concurrency - this.ongoing;
for (let i = 0; i < start; i++) this._next();
}
async _next () {
let { paused, concurrency, concurrencyTimeout, retry, retryTimeout } = this.options;
if (paused) return;
if (this.ongoing >= concurrency) return;
let task = this.queue.dequeue();
if (!task) return;
this.ongoing++;
for (let i = 0; i < retry; i++) {
try {
let result = await task.generator();
task.resolve(result);
break;
} catch (err) {
if (i > retry - 2) {
task.reject(err);
break;
}
}
if (retryTimeout) await sleep(retryTimeout);
}
if (concurrencyTimeout) await sleep(concurrencyTimeout);
this.ongoing--;
this._next();
}
}
export type Promisable<V> = V | PromiseLike<V>;
export interface PromiseQueueOptions {
paused?: boolean,
concurrency?: number,
concurrencyTimeout?: number,
retry?: number,
retryTimeout?: number,
}
export interface Task<V> extends Deferred<V> {
generator (): Promisable<V>,
}
export interface BasicQueue<V> {
enqueue (value: V): void,
dequeue (): V | undefined,
clear (): void,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment