Skip to content

Instantly share code, notes, and snippets.

@RobAWilkinson
Forked from formula1/Example-Child-Worker.js
Created August 31, 2016 20:06
Show Gist options
  • Save RobAWilkinson/960e7ff18594b8ad2a405e130f63d44b to your computer and use it in GitHub Desktop.
Save RobAWilkinson/960e7ff18594b8ad2a405e130f63d44b to your computer and use it in GitHub Desktop.
var util = require('util');
(function(location){
var fn = require(location);
var state = {};
process.on('message', function(message){
fn(message.data, state).then(function(result){
process.send({
id : message.id,
data : result
});
}, function(err){
process.send({
id : message.id,
error : util.inspect(err)
});
});
});
process.send('ready');
})(process.argv[ 2 ]);
var ForkHandle = require('./fork-handle');
var child_process = require('child_process');
var forkContainerPath = __dirname + '/child-process-container';
var constructor = ForkHandle.cloneConstrustor();
constructor.prototype = Object.assign(
Object.create(ForkHandle.prototype),
{
constructor : constructor,
create : function(childPath, ops){
var fork;
return new Promise(function(res, rej){
fork = child_process.fork(
forkContainerPath,
[ childPath ].concat([ JSON.stringify(ops) ] || []),
{ stdio : 'inherit' }
);
var elis, slis, jlis;
fork.once('error', jlis = function(err){
fork.removeListener('message', slis);
fork.removeListener('exit', elis);
rej(err);
}).once('exit', elis = function(){
fork.removeListener('error', jlis);
fork.removeListener('message', slis);
rej(new Error('Child Process exited'));
}).once('message', slis = function(message){
fork.removeListener('exit', elis);
fork.removeListener('error', jlis);
if(message === 'ready') return res();
rej('Incorrect Message Sent');
});
}).then(() =>{
this.fork = fork;
}, function(err){
throw err;
});
},
run : function(args){
var id = Math.random().toString(32).substring(2);
var mLis;
var fork = this.fork;
return new Promise(function(res, rej){
fork.on('message', mLis = function(message){
if(message.id !== id) return;
fork.removeListener('message', mLis);
if(message.error) return rej(message.error);
res(message.data);
});
fork.send({
id : id,
data : args
});
});
},
destroy : function(){
var fork = this.fork;
if(!fork || !fork.connected) return Promise.resolve();
return new Promise(function(res){
fork.on('exit', res);
fork.kill();
});
}
}
);
module.exports = constructor;
module.exports = function(){
var date = Date.now();
while(Date.now() - date < 5000);
return 'ok';
}
var ForkHandle = require('./child-process-handle');
var Manager = require('./Manager');
var __child = path.join(__dirname, './Example-Child.js');
var runner = new Manager(ForkHandle.bind(void 0, __child));
Promise.all([0, 1, 2, 3].map(function(){
return runner.run('hey');
})).then(function(results){
console.log(results);
});
var EE = require('events').EventEmitter;
// method create
// event ready
// method run
// method destroy
// event destroy
const STATES = {
DORMANT : 'dormant',
CREATING : 'creating',
READY : 'ready',
DESTROYING : 'destroying',
DEAD : 'dead'
};
var ForkHandle;
module.exports = ForkHandle = function(functionLocation, ops){
EE.call(this);
this.state = STATES.DORMANT;
var create = this.create.bind(this);
this.create = function(){
this.state = STATES.CREATING;
return create.apply(this, [ functionLocation, ops ]).then(function(){
this.state = STATES.READY;
}, (err)=>{
return this.destroy().then(function(){
throw err;
});
});
};
var destroy = this.destroy.bind(this);
this.destroy = function(){
this.state = STATES.DESTROYING;
return destroy.call(this).then(function(){
this.state = STATES.DEAD;
});
};
};
ForkHandle.prototype = Object.assign(
{},
EE.prototype,
{
constructor : ForkHandle,
create : function(){
throw new Error('this should be abstract');
},
run : function(){
throw new Error('this should be abstract');
},
destroy : function(){
throw new Error('this should be abstract');
}
}
);
ForkHandle.cloneConstrustor = function(){
var Constructor = function(){
if(!(this instanceof Constructor)){
var fork = new Constructor(arguments[ 0 ], arguments[ 1 ], arguments[ 2 ]);
return fork.create().then(function(){
return fork;
});
}
ForkHandle.apply(this, arguments);
};
return Constructor;
};
var minmax = require('./minmax-strategy');
var Manager;
module.exports = Manager = function(WorkerClass, shouldCheckers){
this.queue = [];
this.workers = [];
this.running_workers = 0;
this.WorkerClass = WorkerClass;
this.shouldCheckers = Object.assign({}, shouldCheckers || minmax());
this.unusedDestroyHandler = this.unusedDestroyHandler.bind(this);
var looper;
(looper = ()=>{
return Promise.resolve(
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers)
).then((boo)=>{
if(!boo) return;
return this.createWorker()
.then(this.pushWorker.bind(this))
.then(looper);
});
})().then(function(){
console.log('manager ready');
});
};
Manager.prototype.run = function(args){
return this.reserveWorker().then((worker)=>{
return this.useWorker(worker, args).then((result) =>{
this.pushWorker(worker);
if(result.error) throw result.error;
return result.data;
}, (err) =>{
throw err;
});
});
};
Manager.prototype.reserveWorker = function(){
if(this.workers.length) return Promise.resolve(this.workers.shift());
return new Promise((res, rej)=>{
this.queue.push([ res, rej ]);
Promise.resolve(
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers)
).then((boo)=>{
if(boo) return this.createWorker().then((worker)=>{
if(this.queue.length){
return this.queue.shift()[ 0 ](worker);
}
this.pushWorker(worker);
});
});
});
};
Manager.prototype.useWorker = function(worker, args, retries){
retries = retries || 0;
return new Promise((res, rej)=>{
var elist;
var finished = false;
var usedDestroyHandler;
worker.removeListener('destroy', this.unusedDestroyHandler);
worker.once('destroy', usedDestroyHandler = ()=>{
finished = true;
this.usedDestroyHandler(args, retries).then(res, rej);
});
worker.run(args).then(function(obj){
if(finished) return;
worker.removeListener('destroy', usedDestroyHandler);
worker.on('destroy', this.unusedDestroyHandler);
res({ data : obj });
}, function(err){
if(finished) return;
worker.removeListener('destroy', usedDestroyHandler);
worker.on('destroy', this.unusedDestroyHandler);
res({ error : err });
});
});
};
Manager.prototype.pushWorker = function(worker){
Promise.resolve(
this.shouldCheckers.shouldDestroyWorker(this.queue.length, this.running_workers)
).then((boo) =>{
if(boo){
worker.removeListener('destroy', this.unusedDestroyHandler);
return worker.destroy();
}
if(this.queue.length){
return this.queue.shift()[ 0 ](worker);
}
this.workers.push(worker);
});
};
Manager.prototype.createWorker = function(){
this.running_workers++;
return new Promise((res, rej) =>{
var worker = this.WorkerClass();
var rlist, elist;
worker.once('ready', rlist = function(){
worker.removeListener('error', elist);
res(worker);
}).once('error', elist = function(error){
worker.removeListener('ready', rlist);
rej(error);
});
}).then((worker) =>{
worker.once('destroy', this.removeWorker.bind(this, worker));
worker.once('destroy', this.unusedDestroyHandler);
return worker;
}, function(err){
this.running_workers--;
throw err;
});
};
Manager.prototype.usedDestroyHandler = function(args, retries){
return Promise.resolve(
this.shouldCheckers.shouldRetryRun(retries, this.queue.length, this.running_workers)
).then((boo)=>{
if(!boo) throw 'worker destroyed';
return this.reserveWorker();
}).then((worker)=>{
return this.useWorker(worker, args, retries + 1);
});
};
Manager.prototype.unusedDestroyHandler = function(){
Promise.resolve(
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers)
).then((boo) =>{
if(!boo) return;
this.createWorker().then((newWorker) =>{
this.pushWorker(newWorker);
});
});
};
Manager.prototype.removeWorker = function(worker){
this.running_workers--;
var i = this.workers.indexOf(worker);
if(i > -1) this.workers.splice(1, i);
};
module.exports = function(minimum, maximum){
if(minimum === void 0 || minimum === null) minimum = 1;
if(maximum === void 0 || maximum === null) maximum = Number.POSITIVE_INFINITY;
return {
shouldCreateFork : function(queuelength, running_forks){
// if there are items in the queue create the fork
if(running_forks === maximum) return false;
if(running_forks < minimum) return true;
return queuelength > 0;
},
shouldRetryRun : function(retries, queuelength, running_forks){
// retry 4 times
return retries < 4 && queuelength === 0 && running_forks < maximum;
},
shouldDestroyFork : function(queuelength, running_forks){
return queuelength === 0 && running_forks > 1;
}
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment