Skip to content

Instantly share code, notes, and snippets.

@usefulthink
Last active March 2, 2016 12:37
Show Gist options
  • Save usefulthink/6e019cf04fb175ceb93e to your computer and use it in GitHub Desktop.
Save usefulthink/6e019cf04fb175ceb93e to your computer and use it in GitHub Desktop.
Quick and tiny implementation of a promise-based worker-pool in ES6
/**
* creates a worker url from an existing function.
* The passed function will run in the worker global scope and will be called with two
* arguments - the message-data and a callback - whenever the worker receives a message.
* The function has to invoke the callback with any resulting data to send data back to
* the UI-Thread.
*/
function createWorkerUrl(onMessage) {
const code = `
const onMessage = ${onMessage.toString()};
self.onmessage = function(ev) {
onMessage(ev.data, result => {
self.postMessage(result);
});
};
`;
const blob = new Blob([code]);
return URL.createObjectURL(blob);
}
/**
* Generic preforked worker-pool.
* Workers must handle messages exactly as they are passed to `run()` and must respond with a single message
* with the result. A worker is marked busy and will not receive further tasks
* until this happened.
*/
class WorkerPool {
constructor(url, size) {
this.url = url;
this.workers = [];
this.jobQueue = [];
this.dispatchTimeout = null;
for (let i=0; i<size; i++) {
this.workers.push(new Worker(url));
}
}
/**
* Sends the message and returns a promise that will resolve with the result
* from the worker.
* @param message
* @returns {Promise}
*/
run(message) {
const promise = new Promise((resolve, reject) => {
this.jobQueue.push({message, onComplete: resolve, onError: reject});
});
this.scheduleDispatch();
return promise;
}
/**
* schedules the next queue-run. As Tasks are usually submitted in batches,
* dispatching them is postponed.
* @private
*/
scheduleDispatch() {
if (!this.dispatchTimeout) {
this.dispatchTimeout = setTimeout(() => {
this.dispatchTimeout = null;
this.dispatch();
}, 0);
}
}
/**
* Checks for jobs to dispatch and sends them to idle workers.
* @private
*/
dispatch() {
const idleWorkers = this.workers.filter(worker => !worker.isBusy);
// dispatch n jobs to workers
const n = Math.min(idleWorkers.length, this.jobQueue.length);
for (let i=0; i<n; i++) {
const worker = idleWorkers.shift();
const job = this.jobQueue.shift();
// setup the worker
worker.isBusy = true;
worker.onmessage = (event) => {
job.onComplete(event.data);
this.resetWorker(worker);
this.scheduleDispatch();
};
worker.onerror = (errorEvent) => {
job.onError(errorEvent);
// respawn the worker on error
worker.terminate();
this.workers[i] = new Worker(this.url);
this.scheduleDispatch();
};
// dispatch the job
worker.postMessage(job.message);
}
}
resetWorker(worker) {
// reset the worker-state
worker.isBusy = false;
worker.onmessage = null;
worker.onerror = null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment