Created
March 28, 2019 16:03
-
-
Save orodio/0f3a59d4282fc1f9a9c4242ee6c9c182 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { GenServer, Ok, Reply, NoReply, Continue } = require("./gen-server") | |
var counts = {} | |
function UpdateCount(event) { | |
if (!(this instanceof UpdateCount)) return new UpdateCount(event) | |
this.count = event.count | |
} | |
const Counter = GenServer({ | |
async init(ctx, id) { | |
const count = counts[id] || 0 | |
const subscribers = new Set() | |
return Ok({ id, count, subscribers }) | |
}, | |
handleCast: { | |
async inc(ctx, state) { | |
state.count = state.count + 1 | |
return NoReply(state, Continue("broadcast")) | |
}, | |
async dec(ctx, state) { | |
state.count = state.count - 1 | |
return NoReply(state, Continue("broadcast")) | |
}, | |
async log(ctx, state) { | |
console.log(`Counter|${state.id}`, state.count) | |
return NoReply(state) | |
}, | |
async subscribe(ctx, state, pid) { | |
state.subscribers.add(pid) | |
return NoReply(state, Continue("broadcast")) | |
}, | |
async unsubscribe(ctx, state, pid) { | |
state.subscribers.delete(pid) | |
return NoReply(state) | |
}, | |
}, | |
handleCall: { | |
async getCount(ctx, _from, state) { | |
const returnValue = state.count | |
return Reply(returnValue, state) | |
}, | |
}, | |
handleContinue: { | |
async updateCache(ctx, state) { | |
counts[state.id] = state.count | |
return NoReply(state) | |
}, | |
async broadcast(ctx, state) { | |
for (let sub of state.subscribers) send(sub, UpdateCount({ count: state.count })) | |
return NoReply(state, Continue("updateCache")) | |
}, | |
}, | |
}) | |
Counter.start = id => GenServer.start(Counter, id, { name: `counter|${id}` }) | |
Counter.inc = id => GenServer.cast(`counter|${id}`, "inc") | |
Counter.dec = id => GenServer.cast(`counter|${id}`, "dec") | |
Counter.log = id => GenServer.cast(`counter|${id}`, "log") | |
Counter.subscribe = (id, pid) => GenServer.cast(`counter|${id}`, "subscribe", pid) | |
Counter.unsubscribe = (id, pid) => GenServer.cast(`counter|${id}`, "unsubscribe", pid) | |
Counter.broadcast = id => GenServer.cast(`counter|${id}`, "broadcast") | |
Counter.getCount = id => GenServer.call(`counter|${id}`, "getCount") | |
exports.Counter = Counter |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { send, spawn } = require("./system") | |
exports.Ok = function Ok(state) { | |
if (!(this instanceof Ok)) return new Ok(state) | |
this.state = state | |
} | |
exports.Reply = function Reply(returnValue, state, more = null) { | |
if (!(this instanceof Reply)) return new Reply(returnValue, state, more) | |
this.returnValue = returnValue | |
this.state = state | |
this.more = more | |
} | |
exports.NoReply = function NoReply(state, more = null) { | |
if (!(this instanceof NoReply)) return new NoReply(state, more) | |
this.state = state | |
this.more = more | |
} | |
exports.Continue = function Continue(verb) { | |
if (!(this instanceof Continue)) return new Continue(verb) | |
this.verb = verb | |
} | |
exports.Exit = function Exit(reason, state) { | |
if (!(this instanceof Exit)) return new Exit(state) | |
this.reason = reason | |
this.state = state | |
} | |
exports.GenServerCast = function GenServerCast(verb, ...msg) { | |
if (!(this instanceof GenServerCast)) return new GenServerCast(verb, ...msg) | |
this.verb = verb | |
this.msg = msg | |
} | |
exports.GenServerCall = function GenServerCall(from, verb, ...msg) { | |
if (!(this instanceof GenServerCall)) return new GenServerCall(from, verb, ...msg) | |
this.from = from | |
this.verb = verb | |
this.msg = msg | |
} | |
exports.GenServerCallResponse = function GenServerCallResponse(returnValue) { | |
if (!(this instanceof GenServerCallResponse)) return new GenServerCallResponse(returnValue) | |
this.returnValue = returnValue | |
} | |
function Lifecycle(ok) { | |
if (ok instanceof Ok) this.state = ok.state | |
this.more = null | |
} | |
exports.GenServer = function GenServer(callbacks = {}) { | |
return async (ctx, lifecycle) => { | |
if (!(lifecycle instanceof Lifecycle)) return ctx.recurse(new Lifecycle(await callbacks.init(ctx, lifecycle))) | |
if (lifecycle.more instanceof Continue) { | |
const { verb } = lifecycle.more | |
const resp = await (callbacks.handleContinue || {})[verb](ctx, lifecycle.state) | |
if (resp instanceof NoReply) { | |
lifecycle.state = resp.state | |
lifecycle.more = resp.more | |
return ctx.recurse(lifecycle) | |
} | |
lifecycle.more = null | |
return ctx.recurse(lifecycle) | |
} | |
const req = await ctx.receive() | |
if (req instanceof GenServerCast) { | |
const { verb, msg } = req | |
const resp = await (callbacks.handleCast || {})[verb](ctx, lifecycle.state, ...msg) | |
if (resp instanceof NoReply) { | |
lifecycle.state = resp.state | |
lifecycle.more = resp.more | |
return ctx.recurse(lifecycle) | |
} | |
} else if (req instanceof GenServerCall) { | |
const { from, verb, msg } = req | |
const resp = await (callbacks.handleCall || {})[verb](ctx, from, lifecycle.state, ...msg) | |
if (resp instanceof NoReply) { | |
lifecycle.state = resp.state | |
lifecycle.more = resp.more | |
send(from, GenServerCallResponse(null)) | |
return ctx.recurse(lifecycle) | |
} | |
if (resp instanceof Reply) { | |
lifecycle.state = resp.state | |
lifecycle.more = resp.more | |
send(from, GenServerCallResponse(resp.returnValue)) | |
return ctx.recurse(lifecycle) | |
} | |
} else { | |
const resp = await (callbacks.handleInfo || {})[verb](ctx, lifecycle.state, ...msg) | |
if (resp instanceof NoReply) { | |
lifecycle.state = resp.state | |
lifecycle.more = resp.more | |
return ctx.recurse(lifecycle) | |
} | |
} | |
lifecycle.more = null | |
return ctx.recurse(lifecycle) | |
} | |
} | |
GenServer.start = (logic, state, opts = {}) => spawn(logic, state, opts) | |
GenServer.cast = (pid, verb, ...msg) => send(pid, GenServerCast(verb, msg)) | |
GenServer.call = (pid, verb, ...msg) => | |
new Promise(resolve => { | |
const from = spawn(async ({ receive }) => { | |
const msg = await receive() | |
resolve(msg.returnValue) | |
}) | |
send(pid, GenServerCall(from, verb, ...msg)) | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exports.Mailbox = function Mailbox() { | |
var messages = [] | |
var awaiting = undefined | |
const send = async msg => { | |
messages.push(msg) | |
if (awaiting) { | |
awaiting(messages.shift()) | |
awaiting = undefined | |
} | |
} | |
const receive = () => | |
new Promise(resolve => { | |
const msg = messages.shift() | |
if (msg) return resolve(msg) | |
awaiting = resolve | |
}) | |
return { send, receive } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exports.Pid = function Pid() { | |
return (+new Date() + ~~(Math.random * 999999)).toString(36) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Pid } = require("./pid") | |
const { Mailbox } = require("./mailbox") | |
const rootMailbox = Mailbox() | |
;(async function __loop(state = { pids:{} }) { | |
const { pids } = state | |
const [type, ...msg] = await rootMailbox.receive() | |
return { | |
START: (state, [process]) => { | |
state.pids[process.pid] = state.pids[process.pid] || process | |
return __loop(state) | |
}, | |
EXIT: (state, [pid, _reason, _error]) => { | |
delete state.pids[pid] | |
return __loop(state) | |
}, | |
FORWARD_MSG: (state, [pid, msg]) => { | |
if (pids[pid] != null) { | |
pids[pid].mailbox.send(msg) | |
} else { | |
console.log("Unknown pid", pid) | |
} | |
return __loop(state) | |
}, | |
}[type](state, msg) | |
})() | |
exports.spawn = function spawn(callback, state, opts = {}) { | |
const process = { | |
pid: opts.name || Pid(), | |
mailbox: Mailbox(), | |
} | |
rootMailbox.send(["START", process]) | |
var ctx = { | |
self: process.pid, | |
recurse: async state => callback(ctx, state), | |
receive: process.mailbox.receive, | |
} | |
try { | |
async function __lifecycle(state) { | |
await ctx.recurse(state) | |
rootMailbox.send(["EXIT", process.pid, "normal"]) | |
} | |
__lifecycle(state) | |
} catch (error) { | |
rootMailbox.send(["EXIT", process.pid, "unnatural", error]) | |
} | |
return process.pid | |
} | |
exports.send = function send(pid, msg) { | |
rootMailbox.send(["FORWARD_MSG", pid, msg]) | |
return msg | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment