Created
December 28, 2016 17:18
-
-
Save pwang2/509a38388965fa8e5cb7d6d2836f553d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var fork = require('child_process').fork, | |
guid = require('mout/random/guid'), | |
log = require('quikr-log')(), | |
_ = require('underscore'), | |
Promise = require('bluebird'), | |
cpus = require('os').cpus(), | |
path = require('path'), | |
QUEUE_LIMIT = 32, | |
DEFAULT_TIMEOUT = 20000; //ms | |
/** | |
* QueuedCluster: A customized cluster message balancer, | |
* worker status are tracked and when new message comming, | |
* it will always send to the idle worker to process. | |
* when no idle worker, a random worker is picked | |
* @constructor | |
* @param worker file to fork in cluster | |
* @param concurrentNo optional concurrent number to start cluster | |
* @param args args for child_process.fork | |
* @param options options for child_process.fork | |
* @param timeout time allowance for a message processing, | |
* once excess, worker will be killed | |
* @return {undefined} | |
* */ | |
function QueuedCluster(worker, concurrentNo, args, options, timeout) { | |
this.worker = worker; | |
this.workerShort = path.dirname(worker).split(path.sep).pop(); | |
this.args = args; | |
this.options = options; | |
this.timeout = timeout || DEFAULT_TIMEOUT; | |
this.queue = []; | |
this.taskQueue = {}; | |
this.pool = {}; | |
this.status = 'pending'; | |
this.concurrentNo = concurrentNo || process.env.QUEUE_CLUSTER_WORKER_COUNT || cpus.length / 2; | |
var that = this; | |
this.interval = setInterval(function() { | |
that.scanPool(); | |
}, 1000); | |
} | |
/** | |
* scan worker pool to see is anyone is dead, | |
* kill timeout job and put message attached with the killed process in queue for next processing | |
* | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.scanPool = function() { | |
/*jshint maxcomplexity:5*/ | |
var now = new Date(); | |
for (var pid in this.pool) { | |
if (this.pool.hasOwnProperty(pid)) { | |
var w = this.pool[pid]; | |
if (w.occupied === 1 && now - w.lastServeBy > this.timeout) { | |
log.info('force quit! ', w.currentMessage); | |
w.worker.kill("SIGTERM"); | |
this.enqueue(w.currentMessage.item); | |
} | |
} | |
} | |
}; | |
/** | |
* fired when a worker has finished the processing work. | |
* in this method, a few scan is processed: | |
* - remove it from message queue | |
* - remove it from the task contains the current message | |
* - release the process used to process current message | |
* - update the promise resolve value with current message's process result; | |
* - remove the task current message belongs to from task queue when no more message attached | |
* - find new message ready to be processed and trigger scheduling (TODO: may not need) | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.updateQueue = function(m) { | |
/*jshint maxcomplexity:8*/ | |
var that = this; | |
var task = that.taskQueue[m.taskGuid], | |
msgLen = task.taskMsgs.length, | |
queueLen = that.queue.length, | |
i; | |
//that.queue keeps the reference of m | |
//when updateQueue(m), remove it from queue | |
for (i = 0; i < queueLen; i++) { | |
var qmsg = that.queue[i]; | |
if (m.guid === qmsg.guid) { | |
that.queue.splice(i, 1); | |
break; | |
} | |
} | |
//task.taskMsg also preserves m reference. | |
//remove it from task as well.a | |
for (i = 0; i < msgLen; i++) { | |
var tmsg = task.taskMsgs[i]; | |
if (m.guid === tmsg.guid) { | |
task.taskMsgs.splice(i, 1); | |
break; | |
} | |
} | |
that.pool[m.pid].occupied = 0; | |
task.resolveValue.push(m); | |
if (task.taskMsgs.length === 0) { | |
//Promise magic: task finished, fulfil promise of enqueue | |
task.resolve(task.resolveValue); | |
delete that.taskQueue[m.taskGuid]; | |
} | |
//find if still any message ready to sent to queue | |
var next = _.find(that.queue, function(item) { | |
return !item.inqueue; | |
}); | |
if (next) { | |
that.queuedScheduler(next); | |
} | |
}; | |
/** | |
* create worker for cluster fromt the file to fork | |
* | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.createWorker = function() { | |
var that = this; | |
var p = fork(that.worker, that.args, that.options); | |
log.debug(that.workerShort, 'worker ' + p.pid + ' created'); | |
that.pool[p.pid] = { | |
worker: p, | |
occupied: 0 | |
}; | |
//in worker, we use process.send to send message to here. | |
//here the handler grab the message and emit it to the cluster | |
p.on('message', function(m) { | |
if (m.status !== 'okay') { | |
log.warn('process failed!\n' + JSON.stringify(m, null, 4)); | |
} | |
that.updateQueue(m); | |
}); | |
return p; | |
}; | |
/** | |
* start the cluster by forking concurrentNo of worker | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.start = function() { | |
if (this.status === 'started') { | |
return; | |
} | |
var that = this; | |
var iter = _.range(that.concurrentNo); | |
_.each(iter, function() { | |
var worker = that.createWorker(); | |
worker.on('exit', function(code, signal) { | |
if (that.connected) { | |
delete that.pool[worker.pid]; | |
log.warn('💣 worker %d died (%s).', worker.pid, code || signal); | |
log.info('restarting....'); | |
var newWorker = that.createWorker(); | |
log.info('🔧 recreating worker ' + newWorker.pid); | |
} | |
}); | |
}); | |
this.status = 'started'; | |
return this; | |
}; | |
/** | |
* disconnect all worker | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.disconnect = function() { | |
clearInterval(this.interval); | |
this.connected = false; | |
log.debug('disconnect ' + this.worker); | |
_.each(this.pool, function(w) { | |
log.debug('disconnect worker process ', w.worker.pid); | |
w.worker.kill(); | |
}); | |
}; | |
/** | |
* getIdleWorkers: get idle worker in the pool | |
* | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.getIdleWorkers = function() { | |
var idles = []; | |
_.each(this.pool, function(w) { | |
if (w.occupied === 0) { | |
idles.push(w); | |
} | |
}); | |
return idles; | |
}; | |
/** | |
* enQueue: append message to processing queue | |
* | |
* @param messages | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.enqueue = function(messages) { | |
/*jshint maxcomplexity:5*/ | |
if (this.queue.length > QUEUE_LIMIT) { | |
log.warn(this.queue.length + ' items in queue, think about scaling'); | |
} | |
//no message to process | |
//return default promise. | |
if (!messages || messages.length === 0) { | |
return new Promise(function(resolve) { | |
resolve([]); | |
}); | |
} | |
var that = this; | |
var taskMsgs = Array.isArray(messages) ? messages : [messages]; | |
var taskGuid = guid(); | |
var cookedMessages = _.map(taskMsgs, function(m) { | |
var cookedMessage = {}; | |
cookedMessage.item = m; | |
cookedMessage.taskGuid = taskGuid; | |
cookedMessage.guid = guid(); | |
//push cooled message to message queue. | |
that.queue.push(cookedMessage); | |
//tell queueScheduler to process this message | |
that.queuedScheduler(cookedMessage); | |
return cookedMessage; | |
}); | |
//creat a batch task for messages group passed in | |
//only resolve when all messages in the taskQueue has been processed | |
//the resolver and rejector are stored in the taskQueue object for later resolve. | |
var p = new Promise(function(resolve, reject) { | |
var context = this; | |
that.taskQueue[taskGuid] = { | |
taskMsgs: cookedMessages, | |
resolveValue: [], | |
resolve: resolve.bind(context), | |
reject: reject.bind(context) | |
}; | |
}); | |
return p; | |
}; | |
/** | |
* queuedScheduler: for a specific message, find an idleWorker to work on it | |
* update the worker id and inqueue status in message as well. | |
* | |
* @param m message to process | |
* @return {undefined} | |
*/ | |
QueuedCluster.prototype.queuedScheduler = function(m) { | |
var idleWorkers = this.getIdleWorkers(); | |
if (idleWorkers.length === 0) { | |
log.warn('all workers are busy, waiting for next chance'); | |
return; | |
} | |
var w = idleWorkers[0]; | |
w.occupied = 1; | |
w.lastServeBy = new Date(); | |
m.pid = w.worker.pid; | |
m.inqueue = true; | |
w.currentMessage = m; | |
w.worker.send(m); | |
log.info(this.workerShort, '[', m.pid, '] start processing'); | |
}; | |
module.exports = QueuedCluster; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment