Skip to content

Instantly share code, notes, and snippets.

@Deepak13245
Created June 2, 2020 21:02
Show Gist options
  • Save Deepak13245/865cc54dc2975834ab16c22296d724dc to your computer and use it in GitHub Desktop.
Save Deepak13245/865cc54dc2975834ab16c22296d724dc to your computer and use it in GitHub Desktop.
Manager implementation for async workers
class WorkerManager {
_started = false;
_workers = [];
constructor(concurrency, fn, data = []) {
this._channel = new Channel(data);
this._concurrency = concurrency;
this._fn = fn;
this._workers = this._createWorkers(concurrency);
}
get length() {
return this._workers.length;
}
get results() {
return this._channel.results;
}
_createWorker() {
const worker = new Worker(this._channel, this._fn);
worker.setOnStop(this._onStop);
return worker;
}
_createWorkers(num) {
return range(num)
.map(() => this._createWorker())
}
_startWorkers(workers) {
workers.forEach(w => {
w.start();
})
return workers;
}
_onStop = worker => {
this._workers = this._workers.filter(w =>w!==worker);
}
start() {
this._startWorkers(this._workers);
}
enqueue(...items) {
const remaining = this._concurrency - this.length;
const max = Math.min(items.length, remaining);
this._channel.push(...items);
if(max !== 0) {
this._workers.push(
...this._startWorkers(
this._createWorkers(max),
)
);
}
}
async finish(waitTime = 500) {
return new Promise(resolve => {
const interval = setInterval(() => {
if(this.length === 0) {
clearInterval(interval);
resolve(this.results);
}
}, waitTime);
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment