Skip to content

Instantly share code, notes, and snippets.

@orodio
Created March 28, 2019 16:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save orodio/0f3a59d4282fc1f9a9c4242ee6c9c182 to your computer and use it in GitHub Desktop.
Save orodio/0f3a59d4282fc1f9a9c4242ee6c9c182 to your computer and use it in GitHub Desktop.
const { GenServer, Ok, Reply, NoReply, Continue } = require("./gen-server")
var counts = {}
function UpdateCount(event) {
if (!(this instanceof UpdateCount)) return new UpdateCount(event)
this.count = event.count
}
const Counter = GenServer({
async init(ctx, id) {
const count = counts[id] || 0
const subscribers = new Set()
return Ok({ id, count, subscribers })
},
handleCast: {
async inc(ctx, state) {
state.count = state.count + 1
return NoReply(state, Continue("broadcast"))
},
async dec(ctx, state) {
state.count = state.count - 1
return NoReply(state, Continue("broadcast"))
},
async log(ctx, state) {
console.log(`Counter|${state.id}`, state.count)
return NoReply(state)
},
async subscribe(ctx, state, pid) {
state.subscribers.add(pid)
return NoReply(state, Continue("broadcast"))
},
async unsubscribe(ctx, state, pid) {
state.subscribers.delete(pid)
return NoReply(state)
},
},
handleCall: {
async getCount(ctx, _from, state) {
const returnValue = state.count
return Reply(returnValue, state)
},
},
handleContinue: {
async updateCache(ctx, state) {
counts[state.id] = state.count
return NoReply(state)
},
async broadcast(ctx, state) {
for (let sub of state.subscribers) send(sub, UpdateCount({ count: state.count }))
return NoReply(state, Continue("updateCache"))
},
},
})
Counter.start = id => GenServer.start(Counter, id, { name: `counter|${id}` })
Counter.inc = id => GenServer.cast(`counter|${id}`, "inc")
Counter.dec = id => GenServer.cast(`counter|${id}`, "dec")
Counter.log = id => GenServer.cast(`counter|${id}`, "log")
Counter.subscribe = (id, pid) => GenServer.cast(`counter|${id}`, "subscribe", pid)
Counter.unsubscribe = (id, pid) => GenServer.cast(`counter|${id}`, "unsubscribe", pid)
Counter.broadcast = id => GenServer.cast(`counter|${id}`, "broadcast")
Counter.getCount = id => GenServer.call(`counter|${id}`, "getCount")
exports.Counter = Counter
const { send, spawn } = require("./system")
exports.Ok = function Ok(state) {
if (!(this instanceof Ok)) return new Ok(state)
this.state = state
}
exports.Reply = function Reply(returnValue, state, more = null) {
if (!(this instanceof Reply)) return new Reply(returnValue, state, more)
this.returnValue = returnValue
this.state = state
this.more = more
}
exports.NoReply = function NoReply(state, more = null) {
if (!(this instanceof NoReply)) return new NoReply(state, more)
this.state = state
this.more = more
}
exports.Continue = function Continue(verb) {
if (!(this instanceof Continue)) return new Continue(verb)
this.verb = verb
}
exports.Exit = function Exit(reason, state) {
if (!(this instanceof Exit)) return new Exit(state)
this.reason = reason
this.state = state
}
exports.GenServerCast = function GenServerCast(verb, ...msg) {
if (!(this instanceof GenServerCast)) return new GenServerCast(verb, ...msg)
this.verb = verb
this.msg = msg
}
exports.GenServerCall = function GenServerCall(from, verb, ...msg) {
if (!(this instanceof GenServerCall)) return new GenServerCall(from, verb, ...msg)
this.from = from
this.verb = verb
this.msg = msg
}
exports.GenServerCallResponse = function GenServerCallResponse(returnValue) {
if (!(this instanceof GenServerCallResponse)) return new GenServerCallResponse(returnValue)
this.returnValue = returnValue
}
function Lifecycle(ok) {
if (ok instanceof Ok) this.state = ok.state
this.more = null
}
exports.GenServer = function GenServer(callbacks = {}) {
return async (ctx, lifecycle) => {
if (!(lifecycle instanceof Lifecycle)) return ctx.recurse(new Lifecycle(await callbacks.init(ctx, lifecycle)))
if (lifecycle.more instanceof Continue) {
const { verb } = lifecycle.more
const resp = await (callbacks.handleContinue || {})[verb](ctx, lifecycle.state)
if (resp instanceof NoReply) {
lifecycle.state = resp.state
lifecycle.more = resp.more
return ctx.recurse(lifecycle)
}
lifecycle.more = null
return ctx.recurse(lifecycle)
}
const req = await ctx.receive()
if (req instanceof GenServerCast) {
const { verb, msg } = req
const resp = await (callbacks.handleCast || {})[verb](ctx, lifecycle.state, ...msg)
if (resp instanceof NoReply) {
lifecycle.state = resp.state
lifecycle.more = resp.more
return ctx.recurse(lifecycle)
}
} else if (req instanceof GenServerCall) {
const { from, verb, msg } = req
const resp = await (callbacks.handleCall || {})[verb](ctx, from, lifecycle.state, ...msg)
if (resp instanceof NoReply) {
lifecycle.state = resp.state
lifecycle.more = resp.more
send(from, GenServerCallResponse(null))
return ctx.recurse(lifecycle)
}
if (resp instanceof Reply) {
lifecycle.state = resp.state
lifecycle.more = resp.more
send(from, GenServerCallResponse(resp.returnValue))
return ctx.recurse(lifecycle)
}
} else {
const resp = await (callbacks.handleInfo || {})[verb](ctx, lifecycle.state, ...msg)
if (resp instanceof NoReply) {
lifecycle.state = resp.state
lifecycle.more = resp.more
return ctx.recurse(lifecycle)
}
}
lifecycle.more = null
return ctx.recurse(lifecycle)
}
}
GenServer.start = (logic, state, opts = {}) => spawn(logic, state, opts)
GenServer.cast = (pid, verb, ...msg) => send(pid, GenServerCast(verb, msg))
GenServer.call = (pid, verb, ...msg) =>
new Promise(resolve => {
const from = spawn(async ({ receive }) => {
const msg = await receive()
resolve(msg.returnValue)
})
send(pid, GenServerCall(from, verb, ...msg))
})
exports.Mailbox = function Mailbox() {
var messages = []
var awaiting = undefined
const send = async msg => {
messages.push(msg)
if (awaiting) {
awaiting(messages.shift())
awaiting = undefined
}
}
const receive = () =>
new Promise(resolve => {
const msg = messages.shift()
if (msg) return resolve(msg)
awaiting = resolve
})
return { send, receive }
}
exports.Pid = function Pid() {
return (+new Date() + ~~(Math.random * 999999)).toString(36)
}
const { Pid } = require("./pid")
const { Mailbox } = require("./mailbox")
const rootMailbox = Mailbox()
;(async function __loop(state = { pids:{} }) {
const { pids } = state
const [type, ...msg] = await rootMailbox.receive()
return {
START: (state, [process]) => {
state.pids[process.pid] = state.pids[process.pid] || process
return __loop(state)
},
EXIT: (state, [pid, _reason, _error]) => {
delete state.pids[pid]
return __loop(state)
},
FORWARD_MSG: (state, [pid, msg]) => {
if (pids[pid] != null) {
pids[pid].mailbox.send(msg)
} else {
console.log("Unknown pid", pid)
}
return __loop(state)
},
}[type](state, msg)
})()
exports.spawn = function spawn(callback, state, opts = {}) {
const process = {
pid: opts.name || Pid(),
mailbox: Mailbox(),
}
rootMailbox.send(["START", process])
var ctx = {
self: process.pid,
recurse: async state => callback(ctx, state),
receive: process.mailbox.receive,
}
try {
async function __lifecycle(state) {
await ctx.recurse(state)
rootMailbox.send(["EXIT", process.pid, "normal"])
}
__lifecycle(state)
} catch (error) {
rootMailbox.send(["EXIT", process.pid, "unnatural", error])
}
return process.pid
}
exports.send = function send(pid, msg) {
rootMailbox.send(["FORWARD_MSG", pid, msg])
return msg
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment