-
-
Save RobAWilkinson/960e7ff18594b8ad2a405e130f63d44b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var util = require('util'); | |
(function(location){ | |
var fn = require(location); | |
var state = {}; | |
process.on('message', function(message){ | |
fn(message.data, state).then(function(result){ | |
process.send({ | |
id : message.id, | |
data : result | |
}); | |
}, function(err){ | |
process.send({ | |
id : message.id, | |
error : util.inspect(err) | |
}); | |
}); | |
}); | |
process.send('ready'); | |
})(process.argv[ 2 ]); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var ForkHandle = require('./fork-handle'); | |
var child_process = require('child_process'); | |
var forkContainerPath = __dirname + '/child-process-container'; | |
var constructor = ForkHandle.cloneConstrustor(); | |
constructor.prototype = Object.assign( | |
Object.create(ForkHandle.prototype), | |
{ | |
constructor : constructor, | |
create : function(childPath, ops){ | |
var fork; | |
return new Promise(function(res, rej){ | |
fork = child_process.fork( | |
forkContainerPath, | |
[ childPath ].concat([ JSON.stringify(ops) ] || []), | |
{ stdio : 'inherit' } | |
); | |
var elis, slis, jlis; | |
fork.once('error', jlis = function(err){ | |
fork.removeListener('message', slis); | |
fork.removeListener('exit', elis); | |
rej(err); | |
}).once('exit', elis = function(){ | |
fork.removeListener('error', jlis); | |
fork.removeListener('message', slis); | |
rej(new Error('Child Process exited')); | |
}).once('message', slis = function(message){ | |
fork.removeListener('exit', elis); | |
fork.removeListener('error', jlis); | |
if(message === 'ready') return res(); | |
rej('Incorrect Message Sent'); | |
}); | |
}).then(() =>{ | |
this.fork = fork; | |
}, function(err){ | |
throw err; | |
}); | |
}, | |
run : function(args){ | |
var id = Math.random().toString(32).substring(2); | |
var mLis; | |
var fork = this.fork; | |
return new Promise(function(res, rej){ | |
fork.on('message', mLis = function(message){ | |
if(message.id !== id) return; | |
fork.removeListener('message', mLis); | |
if(message.error) return rej(message.error); | |
res(message.data); | |
}); | |
fork.send({ | |
id : id, | |
data : args | |
}); | |
}); | |
}, | |
destroy : function(){ | |
var fork = this.fork; | |
if(!fork || !fork.connected) return Promise.resolve(); | |
return new Promise(function(res){ | |
fork.on('exit', res); | |
fork.kill(); | |
}); | |
} | |
} | |
); | |
module.exports = constructor; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function(){ | |
var date = Date.now(); | |
while(Date.now() - date < 5000); | |
return 'ok'; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var ForkHandle = require('./child-process-handle'); | |
var Manager = require('./Manager'); | |
var __child = path.join(__dirname, './Example-Child.js'); | |
var runner = new Manager(ForkHandle.bind(void 0, __child)); | |
Promise.all([0, 1, 2, 3].map(function(){ | |
return runner.run('hey'); | |
})).then(function(results){ | |
console.log(results); | |
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var EE = require('events').EventEmitter; | |
// method create | |
// event ready | |
// method run | |
// method destroy | |
// event destroy | |
const STATES = { | |
DORMANT : 'dormant', | |
CREATING : 'creating', | |
READY : 'ready', | |
DESTROYING : 'destroying', | |
DEAD : 'dead' | |
}; | |
var ForkHandle; | |
module.exports = ForkHandle = function(functionLocation, ops){ | |
EE.call(this); | |
this.state = STATES.DORMANT; | |
var create = this.create.bind(this); | |
this.create = function(){ | |
this.state = STATES.CREATING; | |
return create.apply(this, [ functionLocation, ops ]).then(function(){ | |
this.state = STATES.READY; | |
}, (err)=>{ | |
return this.destroy().then(function(){ | |
throw err; | |
}); | |
}); | |
}; | |
var destroy = this.destroy.bind(this); | |
this.destroy = function(){ | |
this.state = STATES.DESTROYING; | |
return destroy.call(this).then(function(){ | |
this.state = STATES.DEAD; | |
}); | |
}; | |
}; | |
ForkHandle.prototype = Object.assign( | |
{}, | |
EE.prototype, | |
{ | |
constructor : ForkHandle, | |
create : function(){ | |
throw new Error('this should be abstract'); | |
}, | |
run : function(){ | |
throw new Error('this should be abstract'); | |
}, | |
destroy : function(){ | |
throw new Error('this should be abstract'); | |
} | |
} | |
); | |
ForkHandle.cloneConstrustor = function(){ | |
var Constructor = function(){ | |
if(!(this instanceof Constructor)){ | |
var fork = new Constructor(arguments[ 0 ], arguments[ 1 ], arguments[ 2 ]); | |
return fork.create().then(function(){ | |
return fork; | |
}); | |
} | |
ForkHandle.apply(this, arguments); | |
}; | |
return Constructor; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var minmax = require('./minmax-strategy'); | |
var Manager; | |
module.exports = Manager = function(WorkerClass, shouldCheckers){ | |
this.queue = []; | |
this.workers = []; | |
this.running_workers = 0; | |
this.WorkerClass = WorkerClass; | |
this.shouldCheckers = Object.assign({}, shouldCheckers || minmax()); | |
this.unusedDestroyHandler = this.unusedDestroyHandler.bind(this); | |
var looper; | |
(looper = ()=>{ | |
return Promise.resolve( | |
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers) | |
).then((boo)=>{ | |
if(!boo) return; | |
return this.createWorker() | |
.then(this.pushWorker.bind(this)) | |
.then(looper); | |
}); | |
})().then(function(){ | |
console.log('manager ready'); | |
}); | |
}; | |
Manager.prototype.run = function(args){ | |
return this.reserveWorker().then((worker)=>{ | |
return this.useWorker(worker, args).then((result) =>{ | |
this.pushWorker(worker); | |
if(result.error) throw result.error; | |
return result.data; | |
}, (err) =>{ | |
throw err; | |
}); | |
}); | |
}; | |
Manager.prototype.reserveWorker = function(){ | |
if(this.workers.length) return Promise.resolve(this.workers.shift()); | |
return new Promise((res, rej)=>{ | |
this.queue.push([ res, rej ]); | |
Promise.resolve( | |
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers) | |
).then((boo)=>{ | |
if(boo) return this.createWorker().then((worker)=>{ | |
if(this.queue.length){ | |
return this.queue.shift()[ 0 ](worker); | |
} | |
this.pushWorker(worker); | |
}); | |
}); | |
}); | |
}; | |
Manager.prototype.useWorker = function(worker, args, retries){ | |
retries = retries || 0; | |
return new Promise((res, rej)=>{ | |
var elist; | |
var finished = false; | |
var usedDestroyHandler; | |
worker.removeListener('destroy', this.unusedDestroyHandler); | |
worker.once('destroy', usedDestroyHandler = ()=>{ | |
finished = true; | |
this.usedDestroyHandler(args, retries).then(res, rej); | |
}); | |
worker.run(args).then(function(obj){ | |
if(finished) return; | |
worker.removeListener('destroy', usedDestroyHandler); | |
worker.on('destroy', this.unusedDestroyHandler); | |
res({ data : obj }); | |
}, function(err){ | |
if(finished) return; | |
worker.removeListener('destroy', usedDestroyHandler); | |
worker.on('destroy', this.unusedDestroyHandler); | |
res({ error : err }); | |
}); | |
}); | |
}; | |
Manager.prototype.pushWorker = function(worker){ | |
Promise.resolve( | |
this.shouldCheckers.shouldDestroyWorker(this.queue.length, this.running_workers) | |
).then((boo) =>{ | |
if(boo){ | |
worker.removeListener('destroy', this.unusedDestroyHandler); | |
return worker.destroy(); | |
} | |
if(this.queue.length){ | |
return this.queue.shift()[ 0 ](worker); | |
} | |
this.workers.push(worker); | |
}); | |
}; | |
Manager.prototype.createWorker = function(){ | |
this.running_workers++; | |
return new Promise((res, rej) =>{ | |
var worker = this.WorkerClass(); | |
var rlist, elist; | |
worker.once('ready', rlist = function(){ | |
worker.removeListener('error', elist); | |
res(worker); | |
}).once('error', elist = function(error){ | |
worker.removeListener('ready', rlist); | |
rej(error); | |
}); | |
}).then((worker) =>{ | |
worker.once('destroy', this.removeWorker.bind(this, worker)); | |
worker.once('destroy', this.unusedDestroyHandler); | |
return worker; | |
}, function(err){ | |
this.running_workers--; | |
throw err; | |
}); | |
}; | |
Manager.prototype.usedDestroyHandler = function(args, retries){ | |
return Promise.resolve( | |
this.shouldCheckers.shouldRetryRun(retries, this.queue.length, this.running_workers) | |
).then((boo)=>{ | |
if(!boo) throw 'worker destroyed'; | |
return this.reserveWorker(); | |
}).then((worker)=>{ | |
return this.useWorker(worker, args, retries + 1); | |
}); | |
}; | |
Manager.prototype.unusedDestroyHandler = function(){ | |
Promise.resolve( | |
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers) | |
).then((boo) =>{ | |
if(!boo) return; | |
this.createWorker().then((newWorker) =>{ | |
this.pushWorker(newWorker); | |
}); | |
}); | |
}; | |
Manager.prototype.removeWorker = function(worker){ | |
this.running_workers--; | |
var i = this.workers.indexOf(worker); | |
if(i > -1) this.workers.splice(1, i); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module.exports = function(minimum, maximum){ | |
if(minimum === void 0 || minimum === null) minimum = 1; | |
if(maximum === void 0 || maximum === null) maximum = Number.POSITIVE_INFINITY; | |
return { | |
shouldCreateFork : function(queuelength, running_forks){ | |
// if there are items in the queue create the fork | |
if(running_forks === maximum) return false; | |
if(running_forks < minimum) return true; | |
return queuelength > 0; | |
}, | |
shouldRetryRun : function(retries, queuelength, running_forks){ | |
// retry 4 times | |
return retries < 4 && queuelength === 0 && running_forks < maximum; | |
}, | |
shouldDestroyFork : function(queuelength, running_forks){ | |
return queuelength === 0 && running_forks > 1; | |
} | |
}; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment