Skip to content

Instantly share code, notes, and snippets.

@formula1
Last active August 31, 2016 21:22
Show Gist options
  • Save formula1/d7736993783849ec48e972f51fb048fb to your computer and use it in GitHub Desktop.
Save formula1/d7736993783849ec48e972f51fb048fb to your computer and use it in GitHub Desktop.
var util = require('util');
(function(location){
var fn = require(location);
var state = {};
process.on('message', function(message){
Promise.resolve().then(function(){
console.log('about to start');
return fn(message.data, state);
}).then(function(result){
console.log('finished');
process.send({
id : message.id,
data : result
});
}, function(err){
process.send({
id : message.id,
error : util.inspect(err)
});
});
});
process.send('ready');
})(process.argv[ 2 ]);
var ForkHandle = require('./ForkHandle');
var child_process = require('child_process');
var forkContainerPath = __dirname + '/child-process-container';
var constructor = ForkHandle.cloneConstrustor();
constructor.prototype = Object.assign(
Object.create(ForkHandle.prototype),
{
constructor : constructor,
create : function(childPath, ops){
var fork;
return new Promise(function(res, rej){
fork = child_process.fork(
forkContainerPath,
[ childPath ].concat([ JSON.stringify(ops) ] || []),
{ stdio : 'inherit' }
);
var elis, slis, jlis;
fork.once('error', jlis = function(err){
fork.removeListener('message', slis);
fork.removeListener('exit', elis);
rej(err);
}).once('exit', elis = function(){
fork.removeListener('error', jlis);
fork.removeListener('message', slis);
rej(new Error('Child Process exited'));
}).once('message', slis = function(message){
fork.removeListener('exit', elis);
fork.removeListener('error', jlis);
if(message === 'ready') return res();
rej('Incorrect Message Sent');
});
}).then(() =>{
this.fork = fork;
}, function(err){
throw err;
});
},
run : function(args){
var id = Math.random().toString(32).substring(2);
var mLis;
var fork = this.fork;
return new Promise(function(res, rej){
fork.on('message', mLis = function(message){
console.log('message recieved', message);
if(message.id !== id) return;
fork.removeListener('message', mLis);
if(message.error) return rej(message.error);
res(message.data);
});
fork.send({
id : id,
data : args
});
});
},
destroy : function(){
var fork = this.fork;
if(!fork || !fork.connected) return Promise.resolve();
return new Promise(function(res){
fork.on('exit', res);
fork.kill();
});
}
}
);
module.exports = constructor;
module.exports = function(){
var date = Date.now();
while(Date.now() - date < 5000);
return 'ok';
}
module.exports = function(){
var date = Date.now();
while(Date.now() - date < 5000);
return 'ok';
}
window.module = {};
importScripts('./webworker-handle.js');
var WebWorkerHandle = window.module.exports;
window.module = {};
importScripts('./Manager.js');
var Manager = window.module.exports;
var __child = './Example-Child.js';
var runner = new Manager(WebWorkerHandle.bind(void 0, __child));
Promise.all([0, 1, 2, 3].map(function(arg){
return runner.run('hey').then(function(value){
console.log('finished with ', arg, value);
return value;
});
})).then(function(results){
console.log('Results:', results);
}).catch(function(e){
console.error(e);
});
var ForkHandle = require('./child-process-handle');
var Manager = require('./Manager');
var path = require('path');
var __child = path.join(__dirname, './Example-Child.js');
var runner = new Manager(ForkHandle.bind(void 0, __child));
Promise.all([0, 1, 2, 3].map(function(arg){
return runner.run('hey').then(function(value){
console.log('finished with ', arg, value);
return value;
});
})).then(function(results){
console.log('Results:', results);
}).catch(function(e){
console.error(e);
});
var EE = require('events').EventEmitter;
// method create
// event ready
// method run
// method destroy
// event destroy
const STATES = {
DORMANT : 'dormant',
CREATING : 'creating',
READY : 'ready',
DESTROYING : 'destroying',
DEAD : 'dead'
};
var ForkHandle;
module.exports = ForkHandle = function(functionLocation, ops){
EE.call(this);
this.state = STATES.DORMANT;
var create = this.create.bind(this);
this.create = function(){
this.state = STATES.CREATING;
return create.apply(this, [ functionLocation, ops ]).then(function(){
this.state = STATES.READY;
}, (err)=>{
return this.destroy().then(function(){
throw err;
});
});
};
var destroy = this.destroy.bind(this);
this.destroy = function(){
this.state = STATES.DESTROYING;
return destroy.call(this).then(function(){
this.state = STATES.DEAD;
});
};
};
ForkHandle.prototype = Object.assign(
{},
EE.prototype,
{
constructor : ForkHandle,
create : function(){
throw new Error('this should be abstract');
},
run : function(){
throw new Error('this should be abstract');
},
destroy : function(){
throw new Error('this should be abstract');
}
}
);
ForkHandle.cloneConstrustor = function(){
var Constructor = function(){
if(!(this instanceof Constructor)){
console.log('about to create');
var fork = new Constructor(arguments[ 0 ], arguments[ 1 ], arguments[ 2 ]);
return fork.create().then(function(){
console.log('created');
return fork;
});
}
ForkHandle.apply(this, arguments);
};
return Constructor;
};
var minmax = require('./minmax-strategy');
var Manager;
module.exports = Manager = function(WorkerClass, shouldCheckers){
this.queue = [];
this.workers = [];
this.running_workers = 0;
this.WorkerClass = WorkerClass;
this.shouldCheckers = Object.assign({}, shouldCheckers || minmax());
console.log(this.shouldCheckers);
this.unusedDestroyHandler = this.unusedDestroyHandler.bind(this);
var looper;
(looper = ()=>{
return Promise.resolve(
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers)
).then((boo)=>{
if(!boo) return;
return this.createWorker()
.then(this.pushWorker.bind(this))
.then(looper);
});
})().then(function(){
console.log('manager ready');
});
};
Manager.prototype.run = function(args){
console.log('about to reserve worker');
return this.reserveWorker().then((worker)=>{
console.log('got worker');
return this.useWorker(worker, args).then((result) =>{
console.log('finished with worker');
this.pushWorker(worker);
return result;
}, (err) =>{
this.pushWorker(worker);
throw err;
});
});
};
Manager.prototype.reserveWorker = function(){
if(this.workers.length) return Promise.resolve(this.workers.shift());
return new Promise((res, rej)=>{
this.queue.push([ res, rej ]);
Promise.resolve(
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers)
).then((boo)=>{
console.log('should create worker', boo);
if(boo) return this.createWorker()
.then(this.pushWorker.bind(this)).catch(function(e){
console.error('error in create', e);
if(this.queue.length){
return this.queue.shift()[ 1 ](e);
}
});
});
});
};
Manager.prototype.useWorker = function(worker, args, retries){
retries = retries || 0;
return new Promise((res, rej)=>{
var elist;
var finished = false;
var usedDestroyHandler;
var unusedDestroyHandler = this.unusedDestroyHandler;
worker.removeListener('destroy', unusedDestroyHandler);
worker.once('destroy', usedDestroyHandler = ()=>{
finished = true;
console.error('destroyed before finish');
this.usedDestroyHandler(args, retries).then(res, rej);
});
worker.run(args).then(function(obj){
console.log(finished, obj);
if(finished) return;
worker.removeListener('destroy', usedDestroyHandler);
worker.on('destroy', unusedDestroyHandler);
res(obj);
}, function(err){
console.error(err);
if(finished) return;
worker.removeListener('destroy', usedDestroyHandler);
worker.on('destroy', unusedDestroyHandler);
rej(err);
});
});
};
Manager.prototype.pushWorker = function(worker){
console.log('pushing worker');
return Promise.resolve(
this.shouldCheckers.shouldDestroyWorker(this.queue.length, this.running_workers)
).then((boo) =>{
if(boo){
worker.removeListener('destroy', this.unusedDestroyHandler);
return worker.destroy();
}
if(this.queue.length){
return this.queue.shift()[ 0 ](worker);
}
this.workers.push(worker);
});
};
Manager.prototype.createWorker = function(){
this.running_workers++;
return this.WorkerClass().then((worker) =>{
console.log('workerclass created');
worker.once('destroy', this.removeWorker.bind(this, worker));
worker.once('destroy', this.unusedDestroyHandler);
return worker;
}, function(err){
this.running_workers--;
throw err;
});
};
Manager.prototype.usedDestroyHandler = function(args, retries){
return Promise.resolve(
this.shouldCheckers.shouldRetryRun(retries, this.queue.length, this.running_workers)
).then((boo)=>{
if(!boo) throw 'worker destroyed';
return this.reserveWorker();
}).then((worker)=>{
return this.useWorker(worker, args, retries + 1);
});
};
Manager.prototype.unusedDestroyHandler = function(){
Promise.resolve(
this.shouldCheckers.shouldCreateWorker(this.queue.length, this.running_workers)
).then((boo) =>{
if(!boo) return;
this.createWorker().then((newWorker) =>{
this.pushWorker(newWorker);
});
});
};
Manager.prototype.removeWorker = function(worker){
this.running_workers--;
var i = this.workers.indexOf(worker);
if(i > -1) this.workers.splice(1, i);
};
module.exports = function(minimum, maximum){
if(minimum === void 0 || minimum === null) minimum = 1;
if(maximum === void 0 || maximum === null) maximum = Number.POSITIVE_INFINITY;
return {
shouldCreateWorker : function(queuelength, running_forks){
// if there are items in the queue create the fork
if(running_forks === maximum) return false;
if(running_forks < minimum) return true;
return queuelength > 0;
},
shouldRetryRun : function(retries, queuelength, running_forks){
// retry 4 times
return retries < 4 && queuelength === 0 && running_forks < maximum;
},
shouldDestroyWorker : function(queuelength, running_forks){
return queuelength === 0 && running_forks > 1;
}
};
};
(function(location){
self.module = {};
importScripts(location);
// make function global
var fn = module.exports;
if(!fn) throw new Error('Function not available');
var state = {};
self.addEventListener('message', function(evt){
var message = evt.data
Promise.resolve().then(function(){
console.log('about to start');
return fn(message.data, state);
}).then(function(result){
console.log('finished');
self.postMessage({
id : message.id,
data : result
});
}, function(err){
self.postMessage({
id : message.id,
error : util.inspect(err)
});
});
});
self.postMessge('ready');
})(self.location.hash);
var oldModule = window.module;
window.module = {}
importScripts('./ForkHandle.js');
var ForkHandle = window.module.exports;
window.module = oldModule;
var webWorkerLoctaion = './webworker-container.js';
var constructor = ForkHandle.cloneConstrustor();
constructor.prototype = Object.assign(
Object.create(ForkHandle.prototype),
{
constructor : constructor,
create : function(childPath, ops){
var fork;
return new Promise(function(res, rej){
fork = new WebWorker(webWorkerLocation + '#' + childPath);
var elis, slis, jlis;
fork.addEventListener('error', jlis = function(err){
fork.removeEventListener('message', slis);
fork.removeEventListener('close', elis);
rej(err);
});
fork.addEventListener('close', elis = function(){
fork.removeEventListener('error', jlis);
fork.removeEventListener('message', slis);
rej(new Error('Worker exited'));
});
fork.addEventListener('message', slis = function(message){
fork.removeEventListener('close', elis);
fork.removeEventListener('error', jlis);
if(message === 'ready') return res();
rej('Incorrect Message Sent');
});
}).then(() =>{
this.fork = fork;
}, function(err){
throw err;
});
},
run : function(args){
var id = Math.random().toString(32).substring(2);
var mLis;
var fork = this.fork;
return new Promise(function(res, rej){
fork.addEventListener('message', mLis = function(event){
var message = event.data;
console.log('message recieved', message);
if(message.id !== id) return;
fork.removeEventListener('message', mLis);
if(message.error) return rej(message.error);
res(message.data);
});
fork.postMessage({
id : id,
data : args
});
});
},
destroy : function(){
var fork = this.fork;
if(!fork) return Promise.resolve();
this.fork = void 0;
return new Promise((res)=>{
fork.on('close', res);
fork.terminate();
});
}
}
);
module.exports = constructor;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment