Skip to content

Instantly share code, notes, and snippets.

@ardeshireshghi
Created August 25, 2020 18:41
Show Gist options
  • Save ardeshireshghi/4eebfa964e7d660e5ddd41af8649c9d4 to your computer and use it in GitHub Desktop.
Save ardeshireshghi/4eebfa964e7d660e5ddd41af8649c9d4 to your computer and use it in GitHub Desktop.
Runs aync/promise tasks concurrently (in the Node.js terms and Event loop) with retries and a set number of concurrency
const EventEmitter = require('events');
const logUpdate = require('log-update');
class ConcurrentTaskRunner extends EventEmitter {
constructor({
tasks = [],
concurrency = 5,
retries = 3,
debug = false
} = {}) {
super();
this._concurrency = concurrency;
this._inProgress = [];
this._complete = [];
this._tasks = tasks;
this._retries = retries;
this._debug = debug;
}
run() {
while (this._shouldRun()) {
const task = this.dequeue();
this._inProgress.push(task);
this._graphTasks();
task
.then(() => {
this._complete.push(task);
this._graphTasks();
if (this._tasks.length === 0) {
this.emit('complete', this.getCompleted());
}
this.run();
})
.catch((err) => {
console.error(
`Task failed with ${err}, retry number ${
task.retries ? task.retries + 1 : 1
}`
);
if (!task.retries || task.retries < this._retries) {
task.retries = task.retries ? task.retries + 1 : 1;
this.enqueue(task);
}
})
.finally(() => {
this._inProgress.splice(this._inProgress.indexOf(task), 1);
this._graphTasks();
});
}
}
enqueue(tasks) {
const newTasks = Array.isArray(tasks) ? tasks : [tasks];
this._tasks.push(...newTasks);
this.run();
}
dequeue() {
return this._tasks.shift();
}
getCompleted() {
return this._complete;
}
_shouldRun() {
return (
this._inProgress.length < this._concurrency && this._tasks.length > 0
);
}
_graphTasks() {
if (!this._debug) {
return;
}
const { _complete, _tasks, _inProgress } = this;
logUpdate(`
Tasks: [${_tasks.map(() => 'X')}]
In progress: [${_inProgress.map(() => 'X')}]
Completed: [${_complete.map(() => 'X')}]
`);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment