|
var net = require('net') |
|
var url = require('url') |
|
|
|
function Closure(f, env) { |
|
var self = { |
|
serialize: function() { |
|
var lines = [] |
|
for (var k in env) { |
|
lines.push("var " + k + " = " + serialize(env[k])) |
|
} |
|
lines.push("return " + f.toString()) |
|
return "(function(){" + lines.join(";") + "})()" |
|
} |
|
} |
|
return self |
|
} |
|
|
|
function Actor(node, def, args, next) { |
|
var mailbox = [] |
|
var waiting = null |
|
var pid = Actor.nextPid() |
|
|
|
var self = { |
|
id: function() { return node.id() + "/" + pid }, |
|
node: function() { return node }, |
|
expect: function(handle) { |
|
var message = mailbox.shift() |
|
if (message) handle.call(self, message); |
|
else waiting = handle; |
|
}, |
|
post: function(message) { |
|
if (waiting) { |
|
var handle = waiting |
|
waiting = null |
|
handle.call(self, message) |
|
} else mailbox.push(message) |
|
}, |
|
send: function(to, message) { Actor.send(to, message) } |
|
} |
|
|
|
Actor.lookup[self.id()] = self |
|
if (next) next(self); |
|
|
|
// Run after caller of Actor |
|
def.apply(null, [self].concat(args)) |
|
} |
|
|
|
Actor.lookup = {} |
|
Actor.nextPid = (function(start) { |
|
return function() { return ++start; } |
|
})(0) |
|
|
|
Actor.send = function(pid, msg) { |
|
if (Actor.lookup[pid]) { |
|
Actor.lookup[pid].post(msg); |
|
} |
|
else { |
|
var parts = url.parse(pid) |
|
var target = parts.protocol + "//" + parts.host |
|
var conn = Endpoint.connections[target] |
|
conn.write(["MESG", target, parts.path.slice(1), |
|
JSON.stringify(msg)].join(" ") + "\r\n") |
|
} |
|
} |
|
|
|
function Endpoint(port, next) { |
|
var q = [] |
|
net.createServer(function(conn) { |
|
conn.on("data", Endpoint.onData(conn)) |
|
}).listen(port, function(conn) { |
|
next(conn) |
|
}) |
|
} |
|
Endpoint.onData = function(conn) { |
|
var acc = [] |
|
return function(buf) { |
|
var parts = buf.toString().split("\r\n") |
|
if (parts.length == 1) acc.push(parts[0]); |
|
else { |
|
var head = acc.join("") + parts[0] |
|
var tail = parts.slice(1) |
|
var last = tail.pop() |
|
if (last.length > 0) acc = last; |
|
[head].concat(tail).forEach(function(m) { |
|
var message = deserialize(m) |
|
message.connection = conn |
|
dispatcher(message) |
|
}) |
|
} |
|
} |
|
} |
|
Endpoint.connections = {} |
|
Endpoint.connect = function(from, to, next) { |
|
var conn = Endpoint.connections[to] |
|
if (conn) { |
|
if (conn.isConnected) next(conn); |
|
else conn.on("connect", function() { next(conn) }); |
|
} else { |
|
var options = url.parse(to) |
|
options.host = options.hostname |
|
var conn = net.connect(options, function() { |
|
conn.isConnected = true |
|
conn.write(["CONN", from, to].join(" ") + "\r\n") |
|
next(conn) |
|
}) |
|
conn.on("data", Endpoint.onData(conn)) |
|
Endpoint.connections[to] = conn |
|
} |
|
} |
|
|
|
function dispatcher(message) { |
|
if (message.type == "MESG") { |
|
Actor.send(message.nid + "/" + message.pid, message.data) |
|
} else { |
|
var node = Node.lookup[message.nid] |
|
if (node) node.send(message); |
|
else { |
|
// notify sender |
|
} |
|
} |
|
} |
|
|
|
function deserialize(bytes) { |
|
var type = bytes.slice(0, 4) |
|
var rem = bytes.slice(5) |
|
switch (type) { |
|
case "CONN": |
|
var pair = rem.split(" ") |
|
return { type: type, nid: pair[1], from: pair[0] } |
|
case "SPWN": |
|
var parts = rem.split(" ") |
|
var from = parts[0] |
|
var nid = parts[1] |
|
var ref = parts[2] |
|
var payload = rem.slice(from.length + nid.length + ref.length + 3) |
|
var meta = JSON.parse(payload) |
|
eval("var closure = " + meta.body) |
|
return { type: type, from: from, nid: nid, closure: closure, ref: ref } |
|
case "NOTE": |
|
var pair = rem.split(" ") |
|
return { type: type, nid: pair[0], ref: pair[1], pid: pair[2] } |
|
case "MESG": |
|
var pair = rem.split(" ") |
|
var payload = rem.slice(pair[0].length + pair[1].length + 2) |
|
return { type: type, nid: pair[0], pid: pair[1], data: JSON.parse(payload) } |
|
} |
|
} |
|
|
|
function serialize(data) { |
|
if (typeof data == "function") { |
|
return JSON.stringify({ type: "function", body: data.toString() }) |
|
} else if (data.serialize) { |
|
return JSON.stringify({ type: "function", body: data.serialize() }) |
|
} else return JSON.stringify(data); |
|
} |
|
|
|
function Node(nid, next) { |
|
var nextRef = (function(i) { return function() { return ++i } })(0) |
|
var refs = {} |
|
var self = { |
|
id: function() { return nid }, |
|
send: function(message) { |
|
self.controller.post(message) |
|
}, |
|
spawnLocal: function(f, args, next) { |
|
Actor(self, f, args, function(actor) { |
|
if (next) next(actor.id()); |
|
}) |
|
}, |
|
spawn: function(remote, closure, args, next) { |
|
Endpoint.connect(nid, remote, function(conn) { |
|
var ref = nextRef() |
|
refs[ref] = next |
|
conn.write(["SPWN", nid, remote, ref, |
|
serialize(closure)].join(" ") + "\r\n") |
|
}) |
|
} |
|
} |
|
Actor(self, function go(p) { |
|
p.expect(function(msg) { |
|
switch (msg.type) { |
|
case "CONN": |
|
Endpoint.connections[msg.from] = msg.connection |
|
go(p) |
|
break |
|
case "SPWN": |
|
self.spawnLocal(msg.closure, [], function(pid) { |
|
msg.connection.write(["NOTE", msg.from, |
|
msg.ref, pid].join(" ") + "\r\n") |
|
go(p) |
|
}) |
|
break |
|
case "NOTE": |
|
if (refs[msg.ref]) refs[msg.ref](msg.pid); |
|
go(p) |
|
break |
|
} |
|
}) |
|
}, [], function(controller) { |
|
self.controller = controller |
|
Node.lookup[nid] = self |
|
var options = url.parse(nid) |
|
Endpoint(options.port, function() { next(self) }) |
|
}) |
|
} |
|
Node.lookup = {} |
|
|
|
exports.Actor = Actor |
|
exports.Node = Node |
|
exports.Endpoint = Endpoint |
|
exports.Closure = Closure |
Where is the EROR/SPWN/etc protocol code that does the remote stuff?