Skip to content

Instantly share code, notes, and snippets.

@luciferous
Last active December 28, 2015 11:28
Show Gist options
  • Save luciferous/7493141 to your computer and use it in GitHub Desktop.
Save luciferous/7493141 to your computer and use it in GitHub Desktop.
Distributed Javascript

distributed.js

An example of how an (Erlang inspired) distributed actor framework could work in Node.js.

Running the example

Start slave nodes.

$ node example.js slave localhost:3000 &
$ node example.js slave localhost:3001 &
$ node example.js slave localhost:3002 &

Start master.

$ node example.js master localhost:4000 nid://localhost:3000

Master output.

Listening on nid://localhost:4000
Expected: 2379
Spawned ping on [nid://localhost:3000/2]
Spawned sum on [nid://localhost:3000/3]
Spawned ping on [nid://localhost:3001/2]
Spawned sum on [nid://localhost:3001/3]
Spawned ping on [nid://localhost:3002/2]
got all pongs
Spawned sum on [nid://localhost:3002/3]
Actual: 2379

Slave output.

Listening on nid://localhost:3000
Listening on nid://localhost:3001
Listening on nid://localhost:3002
Received: [ 50, 92, 52, 51, 13, 58, 73, 73, 15, 7, 1, 96, 21, 35, 4, 75, 63 ]
got ping
Received: [ 17, 15, 6, 11, 19, 76, 38, 78, 74, 24, 35, 54, 50, 35, 43, 47, 96 ]
got ping
Received: [ 77, 57, 9, 65, 57, 78, 49, 26, 19, 79, 83, 88, 73, 27, 17, 78 ]
got ping
var net = require('net')
var url = require('url')
function Closure(f, env) {
var self = {
serialize: function() {
var lines = []
for (var k in env) {
lines.push("var " + k + " = " + serialize(env[k]))
}
lines.push("return " + f.toString())
return "(function(){" + lines.join(";") + "})()"
}
}
return self
}
function Actor(node, def, args, next) {
var mailbox = []
var waiting = null
var pid = Actor.nextPid()
var self = {
id: function() { return node.id() + "/" + pid },
node: function() { return node },
expect: function(handle) {
var message = mailbox.shift()
if (message) handle.call(self, message);
else waiting = handle;
},
post: function(message) {
if (waiting) {
var handle = waiting
waiting = null
handle.call(self, message)
} else mailbox.push(message)
},
send: function(to, message) { Actor.send(to, message) }
}
Actor.lookup[self.id()] = self
if (next) next(self);
// Run after caller of Actor
def.apply(null, [self].concat(args))
}
Actor.lookup = {}
Actor.nextPid = (function(start) {
return function() { return ++start; }
})(0)
Actor.send = function(pid, msg) {
if (Actor.lookup[pid]) {
Actor.lookup[pid].post(msg);
}
else {
var parts = url.parse(pid)
var target = parts.protocol + "//" + parts.host
var conn = Endpoint.connections[target]
conn.write(["MESG", target, parts.path.slice(1),
JSON.stringify(msg)].join(" ") + "\r\n")
}
}
function Endpoint(port, next) {
var q = []
net.createServer(function(conn) {
conn.on("data", Endpoint.onData(conn))
}).listen(port, function(conn) {
next(conn)
})
}
Endpoint.onData = function(conn) {
var acc = []
return function(buf) {
var parts = buf.toString().split("\r\n")
if (parts.length == 1) acc.push(parts[0]);
else {
var head = acc.join("") + parts[0]
var tail = parts.slice(1)
var last = tail.pop()
if (last.length > 0) acc = last;
[head].concat(tail).forEach(function(m) {
var message = deserialize(m)
message.connection = conn
dispatcher(message)
})
}
}
}
Endpoint.connections = {}
Endpoint.connect = function(from, to, next) {
var conn = Endpoint.connections[to]
if (conn) {
if (conn.isConnected) next(conn);
else conn.on("connect", function() { next(conn) });
} else {
var options = url.parse(to)
options.host = options.hostname
var conn = net.connect(options, function() {
conn.isConnected = true
conn.write(["CONN", from, to].join(" ") + "\r\n")
next(conn)
})
conn.on("data", Endpoint.onData(conn))
Endpoint.connections[to] = conn
}
}
function dispatcher(message) {
if (message.type == "MESG") {
Actor.send(message.nid + "/" + message.pid, message.data)
} else {
var node = Node.lookup[message.nid]
if (node) node.send(message);
else {
// notify sender
}
}
}
function deserialize(bytes) {
var type = bytes.slice(0, 4)
var rem = bytes.slice(5)
switch (type) {
case "CONN":
var pair = rem.split(" ")
return { type: type, nid: pair[1], from: pair[0] }
case "SPWN":
var parts = rem.split(" ")
var from = parts[0]
var nid = parts[1]
var ref = parts[2]
var payload = rem.slice(from.length + nid.length + ref.length + 3)
var meta = JSON.parse(payload)
eval("var closure = " + meta.body)
return { type: type, from: from, nid: nid, closure: closure, ref: ref }
case "NOTE":
var pair = rem.split(" ")
return { type: type, nid: pair[0], ref: pair[1], pid: pair[2] }
case "MESG":
var pair = rem.split(" ")
var payload = rem.slice(pair[0].length + pair[1].length + 2)
return { type: type, nid: pair[0], pid: pair[1], data: JSON.parse(payload) }
}
}
function serialize(data) {
if (typeof data == "function") {
return JSON.stringify({ type: "function", body: data.toString() })
} else if (data.serialize) {
return JSON.stringify({ type: "function", body: data.serialize() })
} else return JSON.stringify(data);
}
function Node(nid, next) {
var nextRef = (function(i) { return function() { return ++i } })(0)
var refs = {}
var self = {
id: function() { return nid },
send: function(message) {
self.controller.post(message)
},
spawnLocal: function(f, args, next) {
Actor(self, f, args, function(actor) {
if (next) next(actor.id());
})
},
spawn: function(remote, closure, args, next) {
Endpoint.connect(nid, remote, function(conn) {
var ref = nextRef()
refs[ref] = next
conn.write(["SPWN", nid, remote, ref,
serialize(closure)].join(" ") + "\r\n")
})
}
}
Actor(self, function go(p) {
p.expect(function(msg) {
switch (msg.type) {
case "CONN":
Endpoint.connections[msg.from] = msg.connection
go(p)
break
case "SPWN":
self.spawnLocal(msg.closure, [], function(pid) {
msg.connection.write(["NOTE", msg.from,
msg.ref, pid].join(" ") + "\r\n")
go(p)
})
break
case "NOTE":
if (refs[msg.ref]) refs[msg.ref](msg.pid);
go(p)
break
}
})
}, [], function(controller) {
self.controller = controller
Node.lookup[nid] = self
var options = url.parse(nid)
Endpoint(options.port, function() { next(self) })
})
}
Node.lookup = {}
exports.Actor = Actor
exports.Node = Node
exports.Endpoint = Endpoint
exports.Closure = Closure
var Actor = require('./distributed.js').Actor
var Node = require('./distributed.js').Node
var Closure = require('./distributed.js').Closure
// Async map
function map_(items, f, next) {
function go(rem, acc) {
if (rem.length === 0) next(acc);
else f(rem[0], function(result) {
acc.push(result)
go(rem.slice(1), acc, next)
});
}
go(items, [])
}
function ping(self) {
self.expect(function(msg) {
console.log("got ping")
self.send(msg.from, { from: self.id() })
ping(self)
})
}
function pongMain(self, workers) {
var node = self.node()
function pingExample(worker, next) {
node.spawn(worker, ping, [], function(them) {
console.log("Spawned ping on [" + them + "]")
self.send(them, { from: self.id() })
next(them)
})
}
function waitForPongs(pids, next) {
if (pids.length === 0) next();
else self.expect(function(pong) {
waitForPongs(pids.filter(function(_) { _ != pong.from }), next)
})
}
map_(workers, pingExample, function(pids) {
waitForPongs(pids, function() { console.log("got all pongs") })
})
}
function sumMain(self, workers) {
var node = self.node()
var numbers = []
while (numbers.length < 50) numbers.push(Math.floor(Math.random() * 100));
console.log("Expected:", numbers.reduce(function(x, y) { return x + y }))
var portion = Math.ceil(numbers.length / workers.length)
function sumExample(worker, next) {
var closure = Closure(function(self) {
console.log("Received:", numbers)
var subtotal = numbers.reduce(function(x, y) { return x + y })
self.send(us, { subtotal: subtotal })
}, { us: self.id(), numbers: numbers.slice(0, portion) })
numbers = numbers.slice(portion)
node.spawn(worker, closure, [], function(them) {
console.log("Spawned sum on [" + them +"]")
next(them)
})
}
map_(workers, sumExample, function(pids) {
function go(acc, next) {
self.expect(function(message) {
acc.push(message.subtotal)
if (acc.length === pids.length) next(acc);
else go(acc, next);
})
}
go([], function(subtotals) {
var total = subtotals.reduce(function(x, y) { return x + y })
console.log("Actual:", total)
})
})
}
function usage() {
console.log("Usage: example.js slave host")
console.log(" example.js master host [workers...]");
}
if (process.argv) {
var args = process.argv.splice(2)
if (args.length < 2) usage();
else {
var mode = args.shift()
var host = args.shift()
Node("nid://" + host, function(node) {
console.log("Listening on " + node.id())
if (mode == "master") {
node.spawnLocal(pongMain, [args])
node.spawnLocal(sumMain, [args])
}
})
}
}
@fitzgen
Copy link

fitzgen commented Nov 15, 2013

Where is the EROR/SPWN/etc protocol code that does the remote stuff?

@luciferous
Copy link
Author

@fitzgen See the update.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment