Last active
October 19, 2018 17:49
-
-
Save halfak/5e0c4951c52f57ff3320aff9b51b757e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Construct a thread pool using nWorkers. | |
* | |
* @constructor | |
* @param {int} [nWorkers] The number of worker threads | |
*/ | |
var ThreadPool = function(nWorkers) { | |
this.activeWorkers = 0; | |
this.nWorkers = nWorkers | |
this.taskQueue = [] | |
this.tasksQueued = $.Deferred() | |
.progress(this.ensureWorkers.bind(this)) | |
} | |
ThreadPool.prototype = { | |
/** | |
* Submit a job to be processed. Returns a jQuery.Promise object for the | |
* result of the function call. | |
* | |
* @param {function} [func] The function to run | |
* @param {Array} [arguments] Arguments to pass to the function | |
*/ | |
submit: function(func, arguments) { | |
var resultDfd = $.Deferred(), | |
task = {func: func, arguments: arguments, resultDfd: resultDfd}; | |
this.taskQueue.push(task); | |
this.tasksQueued.notify(); | |
console.debug("Job submitted. Queue size is " + this.taskQueue.length); | |
return resultDfd.promise(); | |
}, | |
/** | |
* Clear all tasks from the queue | |
*/ | |
clear: function() { | |
this.taskQueue = []; | |
}, | |
/** | |
* Process a task and recurse if there are tasks to process. Or shut down. | |
*/ | |
processTasks: function(threadIndex){ | |
// Get a task | |
task = this.taskQueue.shift() | |
// If there's a task on the queue, process it. | |
if (task) { | |
console.debug("Worker #" + threadIndex + " is processing a task"); | |
try { | |
result = task.func.apply(null, task.arguments) | |
} catch(error){ | |
// An error occurred. Note the failure and recurse. | |
task.resultDfd.fail(error) | |
this.processTasks(threadIndex); | |
return | |
} | |
if (result && result.then) { | |
// Result is a promise! Pass on our own Deferred methods. | |
result | |
.done(task.resultDfd.resolve) | |
.fail(task.resultDfd.error) | |
.always(function(){this.processTasks(threadIndex)}.bind(this)) | |
} else { | |
// Result is not a promise. Just be done and recurse. | |
task.resultDfd.done(result) | |
this.processTasks(threadIndex); | |
return | |
} | |
} else { | |
// shut down worker and decrement the worker count | |
console.debug("Shutting down worker #" + threadIndex); | |
this.activeWorkers -= 1; | |
} | |
}, | |
/** | |
* Ensure that the workers are running. This is intended to be called | |
* any time tasks are added to the queue. | |
*/ | |
ensureWorkers: function(){ | |
while (this.activeWorkers < this.nWorkers) { | |
// Note that this.activeWorkers must be passed to setTimeout in order | |
// to preserve pass-by-value. | |
setTimeout( | |
function(i){this.processTasks(i)}.bind(this), 0, this.activeWorkers); | |
console.debug("Starting up worker #" + this.activeWorkers); | |
this.activeWorkers++; | |
} | |
} | |
} | |
// Create a thread pool with 4 workers and submit 100 tasks | |
tp = new ThreadPool(4) | |
for ( var i = 0; i < 100; i++) { | |
tp.submit(function(wait, submitted){ | |
var resultDfd = $.Deferred(); | |
setTimeout(function(){resultDfd.resolve(wait/100, (Date.now()-submitted)/1000)}, wait) | |
return resultDfd.promise(); | |
}, [Math.random()*100, Date.now()]) | |
.done(function(waited, delayed){ | |
console.log("Delayed " + delayed + "s to wait " + waited + "s")}); | |
} | |
// This still works (in a degraded fashion) when the function does not return a promise. | |
for ( var i = 0; i < 100; i++) { | |
tp.submit(function(wait, submitted){ | |
var resultDfd = $.Deferred(); | |
return [wait/100, (Date.now()-submitted)/1000] | |
}, [Math.random()*100, Date.now()]) | |
.done(function(waited, delayed){ | |
console.log("Delayed " + delayed + "s to wait " + waited + "s")}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment