Skip to content

Instantly share code, notes, and snippets.

@ryanvazquez
Created December 11, 2019 02:39
Show Gist options
  • Save ryanvazquez/018269c5e7c807c57fe608525235fede to your computer and use it in GitHub Desktop.
Save ryanvazquez/018269c5e7c807c57fe608525235fede to your computer and use it in GitHub Desktop.
An async task queue
class TaskQueue {
constructor(config = {}){
this.config = Object.assign({
concurrency: 1,
rejectOnError: true,
timeout: 0
}, config);
this.remaining = 0;
this.running = 0;
this.tasks = [];
this.results = [];
this.deferred = {};
}
_drain(){
this.tasks = [];
}
_addTask(task, callback){
this.tasks.push(task);
this.remaining ++;
this._processNextTask(callback);
}
async _processNextTask(callback){
if (this.running === this.config.concurrency && this.remaining === 0 && this.config.timeout > 0){
return setTimeout(this._processNextTask.bind(this), this.config.timeout, callback)
}
if (this.remaining === 0 && this.running === 0){
return this.deferred.resolve(this.results)
}
while (this.running < this.config.concurrency && this.tasks.length){
const task = this.tasks.shift();
let result;
this.running ++;
try {
result = await callback(task);
} catch (err) {
if (this.config.rejectOnError){
this._drain();
return this.deferred.reject(err);
}
result = err;
}
this.running --;
this.remaining --;
this.results.push(result);
this._processNextTask(callback);
}
}
defer(callback){
setImmediate(callback);
return new Promise((resolve, reject) => {
this.deferred.resolve = resolve;
this.deferred.reject = reject;
});
}
map(urls, callback){
const deferred = this.defer(() => urls.forEach(url => this._addTask(url, callback)));
return deferred;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment