Created
February 12, 2016 16:40
-
-
Save usefulthink/8d6d85693ac08f1744a4 to your computer and use it in GitHub Desktop.
Quick and tiny implementation of a promise-based worker-pool in ES6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* more or less generic minimal static worker-pool implementation. | |
* In order to not need any bookkeeping going on, workers must accept tasks | |
* exactly as they are passed to `run()` and must respond with a single message | |
* with the result. | |
*/ | |
class WorkerPool { | |
constructor(url, size) { | |
this.workers = []; | |
this.jobQueue = []; | |
this.dispatchTimeout = null; | |
for (let i=0; i<size; i++) { | |
this.workers.push(new Worker(url)); | |
} | |
} | |
/** | |
* Sends the message and returns a promise that will resolve with the result | |
* from the worker. | |
* @param message | |
* @returns {Promise} | |
*/ | |
run(message) { | |
const promise = new Promise((resolve, reject) => { | |
this.jobQueue.push({message, onComplete: resolve, onError: reject}); | |
}); | |
this.scheduleDispatch(); | |
return promise; | |
} | |
/** | |
* schedules the next queue-run. As Tasks are usually submitted in batches, | |
* dispatching them is postponed. | |
* @private | |
*/ | |
scheduleDispatch() { | |
if (!this.dispatchTimeout) { | |
this.dispatchTimeout = setTimeout(() => { | |
this.dispatchTimeout = null; | |
this.dispatch(); | |
}, 0); | |
} | |
} | |
/** | |
* Checks for jobs to dispatch and sends them to idle workers. | |
* @private | |
*/ | |
dispatch() { | |
const idleWorkers = this.workers.filter(worker => !worker.isBusy); | |
// scheduleDispatch n jobs to workers | |
const n = Math.min(idleWorkers.length, this.jobQueue.length); | |
for (let i=0; i<n; i++) { | |
const worker = idleWorkers.shift(); | |
const job = this.jobQueue.shift(); | |
// setup the worker | |
worker.isBusy = true; | |
worker.onmessage = (event) => { | |
job.onComplete(event.data); | |
this.resetWorker(worker); | |
this.scheduleDispatch(); | |
}; | |
worker.onerror = (errorEvent) => { | |
job.onError(errorEvent); | |
this.resetWorker(worker); | |
this.scheduleDispatch(); | |
}; | |
// dispatch the job | |
worker.postMessage(job.message); | |
} | |
} | |
resetWorker(worker) { | |
// reset the worker-state | |
worker.isBusy = false; | |
worker.onmessage = null; | |
worker.onerror = null; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment