Skip to content

Instantly share code, notes, and snippets.

@halfak
Last active October 19, 2018 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save halfak/5e0c4951c52f57ff3320aff9b51b757e to your computer and use it in GitHub Desktop.
Save halfak/5e0c4951c52f57ff3320aff9b51b757e to your computer and use it in GitHub Desktop.
/**
* Construct a thread pool using nWorkers.
*
* @constructor
* @param {int} [nWorkers] The number of worker threads
*/
var ThreadPool = function(nWorkers) {
this.activeWorkers = 0;
this.nWorkers = nWorkers
this.taskQueue = []
this.tasksQueued = $.Deferred()
.progress(this.ensureWorkers.bind(this))
}
ThreadPool.prototype = {
/**
* Submit a job to be processed. Returns a jQuery.Promise object for the
* result of the function call.
*
* @param {function} [func] The function to run
* @param {Array} [arguments] Arguments to pass to the function
*/
submit: function(func, arguments) {
var resultDfd = $.Deferred(),
task = {func: func, arguments: arguments, resultDfd: resultDfd};
this.taskQueue.push(task);
this.tasksQueued.notify();
console.debug("Job submitted. Queue size is " + this.taskQueue.length);
return resultDfd.promise();
},
/**
* Clear all tasks from the queue
*/
clear: function() {
this.taskQueue = [];
},
/**
* Process a task and recurse if there are tasks to process. Or shut down.
*/
processTasks: function(threadIndex){
// Get a task
task = this.taskQueue.shift()
// If there's a task on the queue, process it.
if (task) {
console.debug("Worker #" + threadIndex + " is processing a task");
try {
result = task.func.apply(null, task.arguments)
} catch(error){
// An error occurred. Note the failure and recurse.
task.resultDfd.fail(error)
this.processTasks(threadIndex);
return
}
if (result && result.then) {
// Result is a promise! Pass on our own Deferred methods.
result
.done(task.resultDfd.resolve)
.fail(task.resultDfd.error)
.always(function(){this.processTasks(threadIndex)}.bind(this))
} else {
// Result is not a promise. Just be done and recurse.
task.resultDfd.done(result)
this.processTasks(threadIndex);
return
}
} else {
// shut down worker and decrement the worker count
console.debug("Shutting down worker #" + threadIndex);
this.activeWorkers -= 1;
}
},
/**
* Ensure that the workers are running. This is intended to be called
* any time tasks are added to the queue.
*/
ensureWorkers: function(){
while (this.activeWorkers < this.nWorkers) {
// Note that this.activeWorkers must be passed to setTimeout in order
// to preserve pass-by-value.
setTimeout(
function(i){this.processTasks(i)}.bind(this), 0, this.activeWorkers);
console.debug("Starting up worker #" + this.activeWorkers);
this.activeWorkers++;
}
}
}
// Create a thread pool with 4 workers and submit 100 tasks
tp = new ThreadPool(4)
for ( var i = 0; i < 100; i++) {
tp.submit(function(wait, submitted){
var resultDfd = $.Deferred();
setTimeout(function(){resultDfd.resolve(wait/100, (Date.now()-submitted)/1000)}, wait)
return resultDfd.promise();
}, [Math.random()*100, Date.now()])
.done(function(waited, delayed){
console.log("Delayed " + delayed + "s to wait " + waited + "s")});
}
// This still works (in a degraded fashion) when the function does not return a promise.
for ( var i = 0; i < 100; i++) {
tp.submit(function(wait, submitted){
var resultDfd = $.Deferred();
return [wait/100, (Date.now()-submitted)/1000]
}, [Math.random()*100, Date.now()])
.done(function(waited, delayed){
console.log("Delayed " + delayed + "s to wait " + waited + "s")});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment