Created
July 15, 2014 04:04
-
-
Save camspiers/075e2e22ca3d7a261d3e to your computer and use it in GitHub Desktop.
Basic worker pool for nodejs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var spawn = require( 'child_process' ).spawn; | |
var fs = require( 'fs' ); | |
var Promise = require( 'promise' ); | |
var glob = Promise.denodeify( require( 'glob' ) ); | |
var readFile = Promise.denodeify( fs.readFile ); | |
var pidDir = __dirname + '/../pids/'; | |
var WORKER_LISTEN_TIMEOUT = 10; | |
/** | |
* Creates a pool of worker processes with respawning and graceful exit | |
* @param numberOfWorkers | |
* @param binary | |
* @param command | |
* @param identifier | |
* @constructor | |
*/ | |
function WorkerPool( numberOfWorkers, binary, command, identifier ) { | |
this.workers = []; | |
this.num = numberOfWorkers; | |
this.bin = binary; | |
this.cmd = command; | |
this.identifier = identifier; | |
this.spawn(); | |
process.on( 'SIGTERM', function() { | |
console.log( 'SIGTERM received, gracefully shutting down workers' ); | |
this.exit(); | |
process.exit(); | |
}.bind( this ) ); | |
process.on( 'SIGHUP', function() { | |
console.log( 'SIGUP received, killing workers and restarting them' ); | |
// Remove the auto-restart on the worker | |
this.workers.forEach( function( worker ) { | |
worker.removeAllListeners( 'exit' ); | |
} ); | |
this.workerSwap(); | |
}.bind( this ) ); | |
} | |
/** | |
* Runs through workers, kills, spawns new and waits for listening message | |
* @param id | |
*/ | |
WorkerPool.prototype.workerSwap = function ( id ) { | |
id = id || 0; | |
console.log( 'Swapping worker ' + id ); | |
this.killWorker( this.workers[ id ] ); | |
this.startWorker( id, true ).then( | |
function ( worker ) { | |
console.log( 'Worker ' + id + ' listening' ); | |
this.addWorker( id, worker ); | |
id++; | |
if ( id < this.num ) { | |
this.workerSwap( id ); | |
} | |
}.bind( this ), | |
function ( err ) { | |
// Failed | |
console.log( 'Worker ' + id + ' Failed to listen, exiting process' ); | |
process.exit(); | |
} | |
); | |
}; | |
/** | |
* | |
* @param id | |
* @returns {string} | |
*/ | |
WorkerPool.prototype.getPidPath = function ( id ) { | |
return pidDir + this.identifier + '-' + id + '.pid'; | |
}; | |
/** | |
* | |
* @param worker | |
*/ | |
WorkerPool.prototype.createPid = function ( worker ) { | |
fs.writeFileSync( this.getPidPath( worker.id ), worker.pid ); | |
}; | |
/** | |
* | |
* @param worker | |
*/ | |
WorkerPool.prototype.deletePid = function ( worker ) { | |
fs.unlinkSync( this.getPidPath( worker.id ) ); | |
}; | |
/** | |
* Starts a worker with a given id | |
* @param id | |
* @param ensureListening | |
* @returns {*} | |
*/ | |
WorkerPool.prototype.startWorker = function ( id, ensureListening ) { | |
console.log( 'Starting worker ' + id ); | |
process.env.id = id; | |
var worker = spawn( | |
this.bin, | |
this.cmd.concat( '--id', id ), | |
{ stdio: [0, 1, 2, 'ipc'], env: process.env } | |
); | |
worker.id = id; | |
worker.on( 'exit', function () { | |
console.log( 'Worker ' + id + ' exited, restarting worker'); | |
this.addWorker( id, this.startWorker( id, false ) ); | |
}.bind( this ) ); | |
this.createPid( worker ); | |
if ( ensureListening ) { | |
return new Promise( function ( resolve, reject ) { | |
worker.on( 'message', function ( data ) { | |
if ( data == 'listening' ) { | |
resolve( worker ); | |
} | |
} ); | |
setTimeout( function () { | |
reject(); | |
}, 1000 * WORKER_LISTEN_TIMEOUT ); | |
} ); | |
} else { | |
return worker; | |
} | |
}; | |
/** | |
* Add a worker to the array | |
* @param id | |
* @param worker | |
*/ | |
WorkerPool.prototype.addWorker = function ( id, worker ) { | |
this.workers[ id ] = worker; | |
}; | |
/** | |
* Spawn all the processes | |
*/ | |
WorkerPool.prototype.spawn = function () { | |
for ( var id = 0; id < this.num; id++ ) { | |
this.addWorker( id, this.startWorker( id ) ); | |
} | |
}; | |
/** | |
* Gracefully exit | |
*/ | |
WorkerPool.prototype.exit = function () { | |
this.workers.forEach( function( worker ) { | |
this.killWorker( worker ); | |
}.bind( this ) ); | |
this.workers = []; | |
}; | |
/** | |
* Kill a worker | |
* @param worker | |
*/ | |
WorkerPool.prototype.killWorker = function ( worker ) { | |
worker.kill(); | |
this.deletePid( worker ); | |
}; | |
/** | |
* | |
* @param identifier | |
* @returns {*} | |
*/ | |
WorkerPool.getPids = function ( identifier ) { | |
return glob( pidDir + identifier + "*.pid" ).then( function ( files ) { | |
return Promise.all( files.map( function ( file ) { | |
return readFile( file ); | |
} ) ); | |
} ); | |
}; | |
module.exports = WorkerPool; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment