Last active
March 2, 2016 12:37
-
-
Save usefulthink/6e019cf04fb175ceb93e to your computer and use it in GitHub Desktop.
Quick and tiny implementation of a promise-based worker-pool in ES6
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* creates a worker url from an existing function. | |
* The passed function will run in the worker global scope and will be called with two | |
* arguments - the message-data and a callback - whenever the worker receives a message. | |
* The function has to invoke the callback with any resulting data to send data back to | |
* the UI-Thread. | |
*/ | |
function createWorkerUrl(onMessage) { | |
const code = ` | |
const onMessage = ${onMessage.toString()}; | |
self.onmessage = function(ev) { | |
onMessage(ev.data, result => { | |
self.postMessage(result); | |
}); | |
}; | |
`; | |
const blob = new Blob([code]); | |
return URL.createObjectURL(blob); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Generic preforked worker-pool. | |
* Workers must handle messages exactly as they are passed to `run()` and must respond with a single message | |
* with the result. A worker is marked busy and will not receive further tasks | |
* until this happened. | |
*/ | |
class WorkerPool { | |
constructor(url, size) { | |
this.url = url; | |
this.workers = []; | |
this.jobQueue = []; | |
this.dispatchTimeout = null; | |
for (let i=0; i<size; i++) { | |
this.workers.push(new Worker(url)); | |
} | |
} | |
/** | |
* Sends the message and returns a promise that will resolve with the result | |
* from the worker. | |
* @param message | |
* @returns {Promise} | |
*/ | |
run(message) { | |
const promise = new Promise((resolve, reject) => { | |
this.jobQueue.push({message, onComplete: resolve, onError: reject}); | |
}); | |
this.scheduleDispatch(); | |
return promise; | |
} | |
/** | |
* schedules the next queue-run. As Tasks are usually submitted in batches, | |
* dispatching them is postponed. | |
* @private | |
*/ | |
scheduleDispatch() { | |
if (!this.dispatchTimeout) { | |
this.dispatchTimeout = setTimeout(() => { | |
this.dispatchTimeout = null; | |
this.dispatch(); | |
}, 0); | |
} | |
} | |
/** | |
* Checks for jobs to dispatch and sends them to idle workers. | |
* @private | |
*/ | |
dispatch() { | |
const idleWorkers = this.workers.filter(worker => !worker.isBusy); | |
// dispatch n jobs to workers | |
const n = Math.min(idleWorkers.length, this.jobQueue.length); | |
for (let i=0; i<n; i++) { | |
const worker = idleWorkers.shift(); | |
const job = this.jobQueue.shift(); | |
// setup the worker | |
worker.isBusy = true; | |
worker.onmessage = (event) => { | |
job.onComplete(event.data); | |
this.resetWorker(worker); | |
this.scheduleDispatch(); | |
}; | |
worker.onerror = (errorEvent) => { | |
job.onError(errorEvent); | |
// respawn the worker on error | |
worker.terminate(); | |
this.workers[i] = new Worker(this.url); | |
this.scheduleDispatch(); | |
}; | |
// dispatch the job | |
worker.postMessage(job.message); | |
} | |
} | |
resetWorker(worker) { | |
// reset the worker-state | |
worker.isBusy = false; | |
worker.onmessage = null; | |
worker.onerror = null; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment