Last active
August 29, 2015 13:56
-
-
Save ritch/9287287 to your computer and use it in GitHub Desktop.
Micro Processes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var MicroProcess = require('../lib/micro-process'); | |
var loopback = require('loopback'); | |
var app = loopback(); | |
app.use(function(req, res, next) { | |
var p = new MicroProcess([req.method, req.url].join(' ')); | |
p.run(next); | |
p.on('error', function(err) { | |
console.error(err); | |
res.statusCode = 500; | |
res.send(err.message); | |
}); | |
p.on('exit', function() { | |
if(res.isWriteable) { | |
res.send('exit'); | |
} | |
}); | |
}); | |
app.get('/auto', function(req, res) { | |
setTimeout(function() { | |
// we never call back.. | |
// but not to worry, when the | |
// micro-process exits it will end this request | |
}); | |
}); | |
app.get('/error', function(req, res) { | |
setTimeout(function() { | |
// async error? pfft | |
this.does.not.exist(); | |
}); | |
}); | |
app.get('/ctx', function(req, res) { | |
// use the current process to pass around contextual information | |
// => "you are calling GET /ctx" | |
res.send(whoAmI()); | |
}); | |
function whoAmI() { | |
var proc = MicroProcess.current(); | |
return 'you are calling ' + proc.name; | |
} | |
app.listen(3000); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var EventEmitter = require('events').EventEmitter; | |
var inherits = require('util').inherits; | |
var domain = require('domain'); | |
var nextTick = process.nextTick; | |
var _setTimeout = setTimeout; | |
var _setInterval = setInterval; | |
var _clearTimeout = clearTimeout; | |
var _clearInterval = clearInterval; | |
module.exports = MicroProcess; | |
function MicroProcess(name, parent) { | |
var mp = this; | |
var d = this.domain = domain.create(); | |
d.microProcess = this; | |
d.on('error', this.emit.bind(this, 'error')); | |
this.name = name; | |
this.pending = []; | |
var origEnter = d.enter; | |
d.enter = function() { | |
mp.enter(); | |
origEnter.apply(d, arguments); | |
} | |
var origExit = d.exit; | |
d.exit = function() { | |
mp.exit(); | |
origExit.apply(d, arguments); | |
} | |
var children = this.children = []; | |
var siblings; | |
if(parent) { | |
siblings = parent.children; | |
this.parent = parent; | |
this.pid = parent.pid++; | |
siblings.push(this); | |
this.once('exit', function() { | |
siblings.splice(siblings.indexOf(this), 1); | |
}.bind(this)); | |
} else { | |
this.pid = 1; | |
this.isRoot = true; | |
} | |
} | |
inherits(MicroProcess, EventEmitter); | |
MicroProcess.prototype.run = function(fn) { | |
this.domain.run(fn); | |
} | |
MicroProcess.current = function() { | |
return process.domain ? process.domain.microProcess : null; | |
} | |
MicroProcess.prototype.enter = function() { | |
var mp = this; | |
global.setTimeout = function setTimeout(fn, n) { | |
arguments[0] = mp.scheduleCallback(fn); | |
var timer = _setTimeout(fn, n); | |
var origClose = timer.close; | |
timer.close = function() { | |
mp.unscheduleCallback(fn); | |
return origClose.apply(timer, arguments); | |
} | |
return timer; | |
} | |
global.setInterval = function setTimeout(fn, n) { | |
arguments[0] = mp.scheduleCallback(fn); | |
var timer = _setInterval(fn, n); | |
var origClose = timer.close; | |
timer.close = function() { | |
mp.unscheduleCallback(fn); | |
return origClose.apply(timer, arguments); | |
} | |
return timer; | |
} | |
process.nextTick = function _nextTick(fn) { | |
nextTick.call(process, mp.scheduleCallback(fn)); | |
} | |
} | |
MicroProcess.prototype.exit = function() { | |
process.nextTick = nextTick; | |
global.setInterval = setInterval; | |
global.setTimeout = setTimeout; | |
if(this.pending.length === 0 && !this.complete) { | |
this.complete = true; | |
this.domain.once('dispose', this.emit.bind(this, 'exit')); | |
this.domain.dispose(); | |
} | |
} | |
MicroProcess.prototype.fork = function(name, main) { | |
var p = new MicroProcess(name, this); | |
p.run(main); | |
return p; | |
} | |
MicroProcess.prototype.scheduleCallback = function(fn) { | |
var p = this | |
p.pending.push(fn); | |
return function callback() { | |
p.unscheduleCallback(fn); | |
try { | |
fn.apply(this, arguments); | |
} catch(e) { | |
p.emit('error', e); | |
} | |
} | |
} | |
MicroProcess.prototype.unscheduleCallback = function(fn) { | |
this.pending.splice(this.pending.indexOf(fn), 1); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment