Skip to content

Instantly share code, notes, and snippets.

@camspiers
Created July 15, 2014 04:04
Show Gist options
  • Save camspiers/075e2e22ca3d7a261d3e to your computer and use it in GitHub Desktop.
Save camspiers/075e2e22ca3d7a261d3e to your computer and use it in GitHub Desktop.
Basic worker pool for nodejs
var spawn = require( 'child_process' ).spawn;
var fs = require( 'fs' );
var Promise = require( 'promise' );
var glob = Promise.denodeify( require( 'glob' ) );
var readFile = Promise.denodeify( fs.readFile );
var pidDir = __dirname + '/../pids/';
var WORKER_LISTEN_TIMEOUT = 10;
/**
* Creates a pool of worker processes with respawning and graceful exit
* @param numberOfWorkers
* @param binary
* @param command
* @param identifier
* @constructor
*/
function WorkerPool( numberOfWorkers, binary, command, identifier ) {
this.workers = [];
this.num = numberOfWorkers;
this.bin = binary;
this.cmd = command;
this.identifier = identifier;
this.spawn();
process.on( 'SIGTERM', function() {
console.log( 'SIGTERM received, gracefully shutting down workers' );
this.exit();
process.exit();
}.bind( this ) );
process.on( 'SIGHUP', function() {
console.log( 'SIGUP received, killing workers and restarting them' );
// Remove the auto-restart on the worker
this.workers.forEach( function( worker ) {
worker.removeAllListeners( 'exit' );
} );
this.workerSwap();
}.bind( this ) );
}
/**
* Runs through workers, kills, spawns new and waits for listening message
* @param id
*/
WorkerPool.prototype.workerSwap = function ( id ) {
id = id || 0;
console.log( 'Swapping worker ' + id );
this.killWorker( this.workers[ id ] );
this.startWorker( id, true ).then(
function ( worker ) {
console.log( 'Worker ' + id + ' listening' );
this.addWorker( id, worker );
id++;
if ( id < this.num ) {
this.workerSwap( id );
}
}.bind( this ),
function ( err ) {
// Failed
console.log( 'Worker ' + id + ' Failed to listen, exiting process' );
process.exit();
}
);
};
/**
*
* @param id
* @returns {string}
*/
WorkerPool.prototype.getPidPath = function ( id ) {
return pidDir + this.identifier + '-' + id + '.pid';
};
/**
*
* @param worker
*/
WorkerPool.prototype.createPid = function ( worker ) {
fs.writeFileSync( this.getPidPath( worker.id ), worker.pid );
};
/**
*
* @param worker
*/
WorkerPool.prototype.deletePid = function ( worker ) {
fs.unlinkSync( this.getPidPath( worker.id ) );
};
/**
* Starts a worker with a given id
* @param id
* @param ensureListening
* @returns {*}
*/
WorkerPool.prototype.startWorker = function ( id, ensureListening ) {
console.log( 'Starting worker ' + id );
process.env.id = id;
var worker = spawn(
this.bin,
this.cmd.concat( '--id', id ),
{ stdio: [0, 1, 2, 'ipc'], env: process.env }
);
worker.id = id;
worker.on( 'exit', function () {
console.log( 'Worker ' + id + ' exited, restarting worker');
this.addWorker( id, this.startWorker( id, false ) );
}.bind( this ) );
this.createPid( worker );
if ( ensureListening ) {
return new Promise( function ( resolve, reject ) {
worker.on( 'message', function ( data ) {
if ( data == 'listening' ) {
resolve( worker );
}
} );
setTimeout( function () {
reject();
}, 1000 * WORKER_LISTEN_TIMEOUT );
} );
} else {
return worker;
}
};
/**
* Add a worker to the array
* @param id
* @param worker
*/
WorkerPool.prototype.addWorker = function ( id, worker ) {
this.workers[ id ] = worker;
};
/**
* Spawn all the processes
*/
WorkerPool.prototype.spawn = function () {
for ( var id = 0; id < this.num; id++ ) {
this.addWorker( id, this.startWorker( id ) );
}
};
/**
* Gracefully exit
*/
WorkerPool.prototype.exit = function () {
this.workers.forEach( function( worker ) {
this.killWorker( worker );
}.bind( this ) );
this.workers = [];
};
/**
* Kill a worker
* @param worker
*/
WorkerPool.prototype.killWorker = function ( worker ) {
worker.kill();
this.deletePid( worker );
};
/**
*
* @param identifier
* @returns {*}
*/
WorkerPool.getPids = function ( identifier ) {
return glob( pidDir + identifier + "*.pid" ).then( function ( files ) {
return Promise.all( files.map( function ( file ) {
return readFile( file );
} ) );
} );
};
module.exports = WorkerPool;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment