Last active
December 18, 2020 01:03
-
-
Save jmendiara/bbbe0b338cc310b92189e3bd30eb9316 to your computer and use it in GitHub Desktop.
Concurrent TaskQueue with lifecycle notification in typescript (alike a Promise.map with concurrency and continue on failure)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { TaskQueue, TaskSuccessEvent, TaskErrorEvent, QueueStartEvent, QueueCompleteEvent } from "/taskqueue"; | |
// Define several task with optional titles | |
const firstTask = () => new Promise<void>((resolve, reject) => setTimeout(resolve, 1000)); | |
firstTask.title = '1 second timeout'; | |
const secondTask = () => new Promise<void>((resolve, reject) => setTimeout(() => reject(new Error('boom!')), 1000)); | |
secondTask.title = '1 second failed timeout'; | |
const thirdTask = () => new Promise<void>((resolve, reject) => setTimeout(resolve, 3000)); | |
thirdTask.title = '3 second timeout'; | |
// Create the queue and add tasks | |
const queue = new TaskQueue('Optional queue name'); | |
queue.push(firstTask); | |
queue.push(secondTask); | |
queue.push(thirdTask); | |
// get notificaions of progress | |
queue.on('taskSuccess', ({ task, time }: TaskSuccessEvent) => | |
console.debug(`[${String(time).padStart(6)}ms] OK ${queue.name}: ${task.title}`) | |
); | |
queue.on('taskError', ({ task, time }: TaskErrorEvent) => | |
console.error(`[${String(time).padStart(6)}ms] KO ${queue.name}: ${task.title}`) | |
); | |
queue.once('start', ({ concurrency, size }: QueueStartEvent) => | |
console.info( | |
`[ START ] ${queue.name} with ${size} task (${concurrency} in parallel)` | |
) | |
); | |
queue.once('complete', ({ time }: QueueCompleteEvent) => | |
console.info(`[COMPLETE] ${queue.name} completed in ${time}ms`) | |
); | |
async function run() { | |
// Run the queue with concurrency. | |
try { | |
await queue.run(2); // 2 simultaneous tasks as a time | |
} catch(err) { | |
console.error(err); | |
} | |
} | |
run() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ START ] Optional queue name with 3 tasks (2 in parallel) | |
[ 1002ms] OK Optional queue name: 1 second timeout | |
[ 1003ms] KO Optional queue name: 1 second failed timeout | |
[ 3005ms] OK Optional queue name: 3 second timeout | |
[COMPLETE] Optional queue name completed in 4009ms | |
Error: Optional queue name ended with 1 errors: | |
1 second failed timeout: boom! | |
at TaskQueue.<anonymous> (/opt/app/src/taskqueue.ts:199:18) | |
at Object.onceWrapper (events.js:313:26) | |
at TaskQueue.emit (events.js:228:7) | |
at TaskQueue.EventEmitter.emit (domain.js:475:20) | |
at TaskQueue.complete (/opt/app/src/taskqueue.ts:139:10) | |
at TaskQueue.runTask (/opt/app/src/taskqueue.ts:163:12) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { EventEmitter } from 'events'; | |
/** | |
* Asyncronous execution task | |
*/ | |
export interface Task { | |
/** async function executing a task */ | |
(): Promise<void>; | |
/** optional title for the task. Good to set for better trazability */ | |
title?: string; | |
} | |
/** | |
* Emmited when a task has ended, successfully or not | |
*/ | |
export interface TaskEndEvent { | |
/** the task executed */ | |
task: Task; | |
/** time (in ms) the task took to complete */ | |
time: number; | |
} | |
/** | |
* Emmited when a task has succeed | |
*/ | |
export interface TaskSuccessEvent { | |
/** the task executed */ | |
task: Task; | |
/** time (in ms) the task took to complete */ | |
time: number; | |
} | |
/** | |
* Emmited when a task has errored | |
*/ | |
export interface TaskErrorEvent { | |
/** the task executed */ | |
task: Task; | |
/** time (in ms) the task took to complete */ | |
time: number; | |
} | |
/** | |
* Emmited when the queue completes | |
*/ | |
export interface QueueCompleteEvent { | |
/** errors raised during the execution */ | |
errors?: Error[]; | |
/** time (in ms) the queue took to complete */ | |
time: number; | |
} | |
/** | |
* Emmited when the queue starts | |
*/ | |
export interface QueueStartEvent { | |
/** The concurrency used for running the queue */ | |
concurrency: number; | |
/** number of tasks in the queue */ | |
size: number; | |
} | |
/** | |
* Simple Task Queue to make notificable concurrent tasks, with continue-on-failure | |
* | |
* notifies about the queue lifecycle with the events: `start`, `complete`, | |
* and the tasks lifecycle with `taskStart`, `taskSuccess`, `taskError`, `taskEnd` | |
*/ | |
export class TaskQueue extends EventEmitter { | |
/** the queue */ | |
protected pending: Task[] = []; | |
private errors = []; | |
private running = 0; | |
private runStart: number; | |
constructor(public name = 'TaskQueue') { | |
super(); | |
} | |
private complete() { | |
const event: QueueCompleteEvent = { | |
time: Date.now() - this.runStart, | |
}; | |
if (this.errors.length !== 0) { | |
event.errors = this.errors; | |
} | |
this.runStart = null; | |
this.emit('complete', event); | |
} | |
/** consumes the queue runnin tasks */ | |
private async runTask(task: Task) { | |
this.running++; | |
const start = Date.now(); | |
try { | |
this.emit('taskStart', { task }); | |
await task(); | |
this.emit('taskSuccess', { task, time: Date.now() - start }); | |
} catch (error) { | |
// store and forget | |
this.emit('taskError', { error, task, time: Date.now() - start }); | |
const err = new Error(`${task.title ?? task.name}: ${error?.message ?? error}`); | |
this.errors.push(err); | |
} | |
this.emit('taskCompleted', { task, time: Date.now() - start }); | |
this.running--; | |
const arePendingTasks = this.pending.length > 0; | |
if (arePendingTasks) { | |
this.runTask(this.pending.shift()); | |
} else if (this.running === 0) { | |
this.complete(); | |
} | |
} | |
/** | |
* Adds a task to the queue | |
* | |
* @param task The task to run | |
*/ | |
public push(task: Task): void { | |
this.pending.push(task); | |
} | |
/** | |
* Runs the queue. | |
* | |
* A failing task does not end the execution, but is stored to late notification. | |
* | |
* Rejects with a simple error with a message with an error abstract. For more detailed errors, you can subscribe | |
* to `complete` event. | |
* | |
* @param concurrency the concurrency to execute task. Not providing this parameter will run all the tasks at once | |
*/ | |
public async run(concurrency?: number): Promise<void> { | |
if (this.runStart) { | |
throw new Error('The queue is already running'); | |
} | |
concurrency = concurrency ?? this.pending.length; | |
if (concurrency <= 0) { | |
throw new Error('Invalid concurrency'); | |
} | |
this.runStart = Date.now(); | |
return new Promise((resolve, reject) => { | |
this.once('complete', ({ errors }) => { | |
if (errors) { | |
const msgs = [ | |
`${this.name} ended with ${errors.length} errors:`, | |
...errors.map((err) => err.message), | |
]; | |
reject(new Error(msgs.join('\n '))); | |
} else { | |
resolve(); | |
} | |
}); | |
this.emit('start', { concurrency, size: this.pending.length }); | |
if (this.pending.length === 0) { | |
this.complete(); | |
} | |
this.pending | |
.splice(0, concurrency ) | |
.forEach((task) => this.runTask(task)); | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
published in https://github.com/jmendiara/foratata