Skip to content

Instantly share code, notes, and snippets.

@jmendiara
Last active December 18, 2020 01:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmendiara/bbbe0b338cc310b92189e3bd30eb9316 to your computer and use it in GitHub Desktop.
Save jmendiara/bbbe0b338cc310b92189e3bd30eb9316 to your computer and use it in GitHub Desktop.
Concurrent TaskQueue with lifecycle notification in typescript (alike a Promise.map with concurrency and continue on failure)
import { TaskQueue, TaskSuccessEvent, TaskErrorEvent, QueueStartEvent, QueueCompleteEvent } from "/taskqueue";
// Define several task with optional titles
const firstTask = () => new Promise<void>((resolve, reject) => setTimeout(resolve, 1000));
firstTask.title = '1 second timeout';
const secondTask = () => new Promise<void>((resolve, reject) => setTimeout(() => reject(new Error('boom!')), 1000));
secondTask.title = '1 second failed timeout';
const thirdTask = () => new Promise<void>((resolve, reject) => setTimeout(resolve, 3000));
thirdTask.title = '3 second timeout';
// Create the queue and add tasks
const queue = new TaskQueue('Optional queue name');
queue.push(firstTask);
queue.push(secondTask);
queue.push(thirdTask);
// get notificaions of progress
queue.on('taskSuccess', ({ task, time }: TaskSuccessEvent) =>
console.debug(`[${String(time).padStart(6)}ms] OK ${queue.name}: ${task.title}`)
);
queue.on('taskError', ({ task, time }: TaskErrorEvent) =>
console.error(`[${String(time).padStart(6)}ms] KO ${queue.name}: ${task.title}`)
);
queue.once('start', ({ concurrency, size }: QueueStartEvent) =>
console.info(
`[ START ] ${queue.name} with ${size} task (${concurrency} in parallel)`
)
);
queue.once('complete', ({ time }: QueueCompleteEvent) =>
console.info(`[COMPLETE] ${queue.name} completed in ${time}ms`)
);
async function run() {
// Run the queue with concurrency.
try {
await queue.run(2); // 2 simultaneous tasks as a time
} catch(err) {
console.error(err);
}
}
run()
[ START ] Optional queue name with 3 tasks (2 in parallel)
[ 1002ms] OK Optional queue name: 1 second timeout
[ 1003ms] KO Optional queue name: 1 second failed timeout
[ 3005ms] OK Optional queue name: 3 second timeout
[COMPLETE] Optional queue name completed in 4009ms
Error: Optional queue name ended with 1 errors:
1 second failed timeout: boom!
at TaskQueue.<anonymous> (/opt/app/src/taskqueue.ts:199:18)
at Object.onceWrapper (events.js:313:26)
at TaskQueue.emit (events.js:228:7)
at TaskQueue.EventEmitter.emit (domain.js:475:20)
at TaskQueue.complete (/opt/app/src/taskqueue.ts:139:10)
at TaskQueue.runTask (/opt/app/src/taskqueue.ts:163:12)
import { EventEmitter } from 'events';
/**
* Asyncronous execution task
*/
export interface Task {
/** async function executing a task */
(): Promise<void>;
/** optional title for the task. Good to set for better trazability */
title?: string;
}
/**
* Emmited when a task has ended, successfully or not
*/
export interface TaskEndEvent {
/** the task executed */
task: Task;
/** time (in ms) the task took to complete */
time: number;
}
/**
* Emmited when a task has succeed
*/
export interface TaskSuccessEvent {
/** the task executed */
task: Task;
/** time (in ms) the task took to complete */
time: number;
}
/**
* Emmited when a task has errored
*/
export interface TaskErrorEvent {
/** the task executed */
task: Task;
/** time (in ms) the task took to complete */
time: number;
}
/**
* Emmited when the queue completes
*/
export interface QueueCompleteEvent {
/** errors raised during the execution */
errors?: Error[];
/** time (in ms) the queue took to complete */
time: number;
}
/**
* Emmited when the queue starts
*/
export interface QueueStartEvent {
/** The concurrency used for running the queue */
concurrency: number;
/** number of tasks in the queue */
size: number;
}
/**
* Simple Task Queue to make notificable concurrent tasks, with continue-on-failure
*
* notifies about the queue lifecycle with the events: `start`, `complete`,
* and the tasks lifecycle with `taskStart`, `taskSuccess`, `taskError`, `taskEnd`
*/
export class TaskQueue extends EventEmitter {
/** the queue */
protected pending: Task[] = [];
private errors = [];
private running = 0;
private runStart: number;
constructor(public name = 'TaskQueue') {
super();
}
private complete() {
const event: QueueCompleteEvent = {
time: Date.now() - this.runStart,
};
if (this.errors.length !== 0) {
event.errors = this.errors;
}
this.runStart = null;
this.emit('complete', event);
}
/** consumes the queue runnin tasks */
private async runTask(task: Task) {
this.running++;
const start = Date.now();
try {
this.emit('taskStart', { task });
await task();
this.emit('taskSuccess', { task, time: Date.now() - start });
} catch (error) {
// store and forget
this.emit('taskError', { error, task, time: Date.now() - start });
const err = new Error(`${task.title ?? task.name}: ${error?.message ?? error}`);
this.errors.push(err);
}
this.emit('taskCompleted', { task, time: Date.now() - start });
this.running--;
const arePendingTasks = this.pending.length > 0;
if (arePendingTasks) {
this.runTask(this.pending.shift());
} else if (this.running === 0) {
this.complete();
}
}
/**
* Adds a task to the queue
*
* @param task The task to run
*/
public push(task: Task): void {
this.pending.push(task);
}
/**
* Runs the queue.
*
* A failing task does not end the execution, but is stored to late notification.
*
* Rejects with a simple error with a message with an error abstract. For more detailed errors, you can subscribe
* to `complete` event.
*
* @param concurrency the concurrency to execute task. Not providing this parameter will run all the tasks at once
*/
public async run(concurrency?: number): Promise<void> {
if (this.runStart) {
throw new Error('The queue is already running');
}
concurrency = concurrency ?? this.pending.length;
if (concurrency <= 0) {
throw new Error('Invalid concurrency');
}
this.runStart = Date.now();
return new Promise((resolve, reject) => {
this.once('complete', ({ errors }) => {
if (errors) {
const msgs = [
`${this.name} ended with ${errors.length} errors:`,
...errors.map((err) => err.message),
];
reject(new Error(msgs.join('\n ')));
} else {
resolve();
}
});
this.emit('start', { concurrency, size: this.pending.length });
if (this.pending.length === 0) {
this.complete();
}
this.pending
.splice(0, concurrency )
.forEach((task) => this.runTask(task));
});
}
}
@jmendiara
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment