Skip to content

Instantly share code, notes, and snippets.

@jcmoore
Last active September 24, 2016 01:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jcmoore/9eed5bd1e5892fc4d995848ef3cda2d3 to your computer and use it in GitHub Desktop.
Save jcmoore/9eed5bd1e5892fc4d995848ef3cda2d3 to your computer and use it in GitHub Desktop.
Simple message-bus implementation to enable faux-microservices
var figurative = (function () {
var writeGuard = {};
var invalidChannel = typeof ShuttleBus !== "undefined" && typeof ShuttleBus.init === "function" ?
ShuttleBus.init(null, null, null, null) : {
init: function () {
return invalidChannel;
},
isInService: function () { return 0|false; },
ruin: function () { return 0|false; },
obit: function () { return null; },
scan: function () { return null; },
on: function () { return null; },
off: function () { return null; },
watch: function () { return null; },
ignore: function () { return null; },
shuttle: function () { return null; },
reroute: function () {
return invalidChannel;
},
branch: function () {
return invalidChannel;
},
};
function barrierReadOnly (directive, kind, route, data, bus) {
switch (String(directive === null || directive === void 0 ? "" : directive)) {
case "bus": return 0|(data === writeGuard ? false : true); // prevent shuttling
}
return 0|false;
}
function FigurativeIterated (done, value) {
this.done = Boolean(done);
this.value = value;
}
function FigurativeIterable (next) {
this.next = null;
this.next = next;
}
function ImmutableLabel (id, ns) {
this._id = String(id === null || id === void 0 ? "" : id);
this._ns = String(ns === null || ns === void 0 ? "" : ns);
}
ImmutableLabel.prototype.id = function () {
return this._id;
};
ImmutableLabel.prototype.ns = function () {
return this._ns;
};
var invalidLabel = new ImmutableLabel("", "");
var invalidLabelMapTriplet = [new Map(), null, ""];
var invalidLabelPair = [invalidLabel, null];
var lazyLabelKinds = new Map();
var lazyLabelSpaces = new Map();
var labelChannel = invalidChannel.init(null, null, null, null);
function NamespaceMessage (ns) {
this.ns = String(ns === null || ns === void 0 ? "" : ns);
}
function LabelMessage (label) {
this.label = label;
}
function persistentLabel(id, ns, signature, registration) {
var space = String(ns === null || ns === void 0 ? "" : ns);
var name = String(id === null || id === void 0 ? "" : id);
var labelPair = invalidLabelPair;
var result = invalidLabelPair[0];
var has = lazyLabelSpaces.has(space);
var mapTriplet = has ? lazyLabelSpaces.get(space) : [new Map(), registration || null, space];
var labels = mapTriplet[0];
if (!has) lazyLabelSpaces.set(space, mapTriplet);
else if (!mapTriplet[1]) mapTriplet[1] = registration || null;
else if (registration && mapTriplet[1] !== registration) return null; // reg refused
labelPair = labels.get(name) || invalidLabelPair;
result = labelPair[0] || invalidLabelPair[0];
if (labelPair === invalidLabelPair) {
if (!lazyLabelKinds.has(space)) result = new (novelLabelKind(space))(name);
else result = new (lazyLabelKinds.get(space))(name);
labelChannel.shuttle("label", new LabelMessage(result));
labelPair = [result, signature || null];
labels.set(name, labelPair);
} else if (!labelPair[1]) labelPair[1] = signature || null;
else if (signature && labelPair[1] !== signature) return null; // sig refused
return result;
}
function novelLabelKind(ns) {
var space = String(ns === null || ns === void 0 ? "" : ns);
function SharedLabel (id) {
var name = String(id === null || id === void 0 ? "" : id);
ImmutableLabel.call(this, name, space);
}
SharedLabel.prototype.id = ImmutableLabel.prototype.id;
SharedLabel.prototype.ns = ImmutableLabel.prototype.ns;
lazyLabelKinds.set(space, SharedLabel);
labelChannel.shuttle("namespace", new NamespaceMessage(space));
return SharedLabel;
}
function validateLabel (label) {
if (label.id !== ImmutableLabel.prototype.id) return 0|false;
else if (label.ns !== ImmutableLabel.prototype.ns) return 0|false;
else if (!lazyLabelSpaces.has(label.ns())) return 0|false;
else if (lazyLabelSpaces.get(label.ns()).get(label.id()) !== label) return 0|false;
else return 0|true;
}
function labelScanner (route, mode, channel) {
return labelNamespaceIterable(lazyLabelSpaces.values());
}
function labelNamespaceIterable (iterator) {
return new FigurativeIterable(function next () {
var iterated = iterator.next();
var mapTriplet = iterated.value || invalidLabelMapTriplet;
var labels = mapTriplet[0];
var ns = mapTriplet[2];
var iteration = new LabelNamespaceIteration(ns, labelIterable(labels.values()));
return new FigurativeIterated(0|iterated.done, iteration);
});
}
function LabelNamespaceIteration (ns, iterable) {
this.ns = ns;
this.iterable = iterable;
}
function labelIterable (iterator) {
return new FigurativeIterable(function next () {
var iterated = iterator.next();
var labelPair = iterated.value || invalidLabelPair;
var iteration = new LabelIteration(labelPair[0]);
return new FigurativeIterated(0|iterated.done, iteration);
});
}
function LabelIteration (label) {
this.label = label;
//this.ns = label.ns();
//this.id = label.id();
}
return {
label: {
from: persistentLabel,
check: validateLabel,
channel: labelChannel.branch(barrierReadOnly, null, labelScanner, writeGuard, null),
},
};
}());
var ShuttleBus = (function ShuttleBus () {
var invalidNotifications = [function () {}];
var invalidTerminalTuple = terminalTuple(new Map(), "", 0, 0);
var invalidCallbackTuple = callbackTuple(invalidNotifications[0], 0);
var invalidBranchTuple = branchTuple(invalidNotifications[0], 0, 0);
var invalidStems = new Map();
var invalidResolver = invalidNotifications[0];
var invalidPromise = new Promise(function (resolve) {
invalidResolver = resolve;
});
var tuplePool = [null, invalidNotifications[0], null];
function claimTuplet (message, except, station) {
var tuple = tuplePool;
var limit = arguments.length;
var count = limit;
var index = -1;
if (tuplePool[0]) tuplePool = tuplePool[0];
else tuple = [null, invalidNotifications[0], station || null];
count = tuple.length > count ? tuple.length : count;
while (++index < count) tuple[index] = index < limit ? arguments[index] : null;
return tuple;
}
function abandonTuplet (tuple) {
var count = tuple.length;
var index = -1;
tuple[++index] = tuplePool;
tuple[++index] = invalidNotifications[0];
while (++index < count) tuple[index] = null;
tuplePool = tuple;
// TODO: consider halflife-style releasing of pooled resources
}
function Dispatch (type, route, strong, weak) {
this.type = type || "";
this.route = route || "";
this.strong = strong || 0;
this.weak = weak || 0;
}
var invalidDispatch = new Dispatch("", "", 0, 0);
function tupleAccessFactory (index) {
var idx = 0|Number(index);
return function accessTuple (tuple, young) {
var old = null;
old = tuple[idx];
if (arguments.length > 1) tuple[idx] = young;
return old;
}
}
var accessTerminal = tupleAccessFactory(0);
var accessTerminalRoute = tupleAccessFactory(1);
var accessTerminalStrong = tupleAccessFactory(2);
var accessTerminalWeak = tupleAccessFactory(3);
function terminalTuple (map, route, strong, weak) {
return [map, route, strong, weak];
}
var accessCallback = tupleAccessFactory(0);
var accessCallbackStrength = tupleAccessFactory(1);
var accessCallbackToken = tupleAccessFactory(2);
function callbackTuple (notify, strength, token) {
return [notify, strength, token];
}
var accessBranch = tupleAccessFactory(0);
var accessBranchInners = tupleAccessFactory(1);
var accessBranchOuters = tupleAccessFactory(2);
function branchTuple (notify, inners, outers) {
return [notify, inners, outers];
}
function Station (condemned, barrier, log, scanner) {
this._terminals = new Map();
this._destructor = invalidPromise;
this._demolish = invalidResolver;
this._barrier = null;
this._scanner = null;
this._log = null;
if (typeof barrier === "function") this._barrier = barrier;
if (typeof scanner === "function") this._scanner = scanner;
if (typeof log === "function") this._log = log;
if (condemned && typeof condemned.then === "function") this._destructor = condemned;
}
function shutter (condemned) {
var station = this;
var closed = 0|false;
return condemned.then(teardown, teardown);
function teardown (reason) {
if (closed) return 0|false;
else closed = 0|true;
var auth = null;
var has = 0|false;
var list = invalidNotifications;
var item = invalidNotifications[0];
var sextet = claimTuplet(list, item, station, "", null, auth);
var most = [invalidTerminalTuple];
station._destructor = null;
station._demolish = null;
station._terminals.forEach(collectEachTerminalTuple, most);
most = most.reverse();
most.pop();
most.forEach(closeEachTerminal, sextet);
has = 0|station._terminals.has("");
if (has) closeEachTerminal.call(sextet, station._terminals.get(""));
station._terminals.clear();
station._terminals = null;
//station._scanner = null; // leave the scanner
station._log = null;
station = null;
abandonTuplet(sextet);
return 0|true;
}
}
function collectEachTerminalTuple (tuple, route) {
var list = this;
if (route !== "") list.push(tuple);
}
function closeEachTerminal (tuple) {
var sextet = this;
var station = sextet[2];
var token = sextet[5];
var terminal = accessTerminal(tuple);
var route = accessTerminalRoute(tuple);
var services = [invalidCallbackTuple];
sextet[3] = route;
terminal.forEach(collectEachService, services);
services.forEach(stopEachService, sextet);
sextet[0] = invalidNotifications;
sextet[1] = invalidNotifications[0];
sextet[2] = station;
sextet[3] = route;
sextet[4] = null;
sextet[5] = token;
terminal.clear();
}
function collectEachService (bundle, notify) {
var list = this;
list.push(bundle);
}
function stopEachService (bundle) {
var sextet = this;
var station = sextet[2];
var route = sextet[3];
var token = sextet[5];
var notify = accessCallback(bundle);
var weakly = 0|true;
if (hasStrength(bundle, 1<<0)) egress.call(station, route, notify, 0|!weakly, "teardown", token);
if (hasStrength(bundle, 1<<1)) egress.call(station, route, notify, 0|weakly, "teardown", token);
sextet[0] = invalidNotifications;
sextet[1] = invalidNotifications[0];
sextet[2] = station;
sextet[3] = route;
sextet[4] = token;
}
Station.prototype.isInService = function () {
return 0|!!(this._terminals);
};
Station.prototype.ruin = function (token) {
var auth = arguments.length < fn.length ? null : token;
var dryRun = 0|true;
if (this._demolish === invalidResolver) this.obit(0|!dryRun, auth);
if (this._demolish === invalidResolver) return 0|false;
if (!this._demolish) return 0|false;
this._demolish();
return 0|true;
};
Station.prototype.obit = function fn (dryRun, token) {
var auth = arguments.length < fn.length ? null : token;
var station = this;
var future = this._destructor;
if (dryRun) return this._demolish === invalidResolver ? null : this._destructor;
else if (this._demolish === invalidResolver) {
if (this._barrier && (0, this._barrier)("obit", "", "", auth, this)) return null;
this._destructor = shutter.call(this, new Promise(function (resolve) {
station._demolish = resolve;
station = null;
}));
if (future !== invalidPromise) {
future.then(this._demolish, this._demolish);
future = invalidPromise;
}
}
return this._destructor;
};
Station.prototype.init = function (condemned, exempt, log, scanner) {
return new Station(condemned || null, exempt || null, log || null, scanner || null);
};
Station.prototype.scan = function fn (route, mode, token) {
var auth = arguments.length < fn.length ? null : token;
var kind = String(mode === null || mode === void 0 ? "" : mode);
//if (!this.isInService()) return null;
if (this._barrier && (0, this._barrier)("scan", kind, route, auth, this)) return null;
else if (this._scanner) return (0, this._scanner)(route, mode, this);
else return null;
};
function hasStrength (bundle, strength) {
return 0|!!(accessCallbackStrength(bundle) & strength);
}
function addStrength (bundle, strength) {
var old = accessCallbackStrength(bundle);
return 0|!(strength & accessCallbackStrength(bundle, old | strength));
}
function removeStrength (bundle, strength) {
var old = accessCallbackStrength(bundle);
return 0|!!(strength & accessCallbackStrength(bundle, old & ~strength));
}
Station.prototype.on = function fn (route, notify, token) {
var auth = arguments.length < fn.length ? null : token;
var WEAKLY = 0|true;
return ingress.call(this, route, notify, 0|!WEAKLY, "on", auth);
};
Station.prototype.watch = function fn (route, notify, token) {
var auth = arguments.length < fn.length ? null : token;
var WEAKLY = 0|true;
return ingress.call(this, route, notify, 0|WEAKLY, "watch", auth);
};
function ingress (route, notify, weakly, kind, token) {
if (!this.isInService()) return null;
if (this._barrier && (0, this._barrier)("ingress", kind, route, token, this)) return null;
var strength = 1 << (weakly ? 1 : 0);
var has = 0|this._terminals.has(route);
var tuple = has ? this._terminals.get(route) : terminalTuple(new Map(), route, 0, 0);
var terminal = accessTerminal(tuple);
var weak = 0;
var strong = 0;
var change = 0|true;
var dispatch = invalidDispatch;
var FORCE = 0|true;
if (!has) this._terminals.set(route, tuple);
if (terminal.has(notify)) change = 0|addStrength(terminal.get(notify), strength);
else terminal.set(notify, callbackTuple(notify, strength, route));
if (change) {
weak = accessTerminalWeak(tuple);
strong = accessTerminalStrong(tuple);
if (weakly) accessTerminalWeak(tuple, ++weak);
else accessTerminalStrong(tuple, ++strong);
if ((!this._barrier || !(0, this._barrier)("dispatch", kind, route, token, this)) &&
this._terminals.has("")) {
dispatch = new Dispatch(kind, route, strong, weak);
busMessage.call(this, 0|FORCE, "dispatch", "", dispatch, null, notify, token);
}
if (route === "") bootstrapDispatchMonitor.call(this, notify, kind, token);
}
return this;
};
Station.prototype.off = function fn (route, notify, token) {
var auth = arguments.length < fn.length ? null : token;
var WEAKLY = 0|true;
return egress.call(this, route, notify, 0|!WEAKLY, "off", auth);
};
Station.prototype.ignore = function fn (route, notify, token) {
var auth = arguments.length < fn.length ? null : token;
var WEAKLY = 0|true;
return egress.call(this, route, notify, 0|WEAKLY, "ignore", auth);
};
function egress (route, notify, weakly, kind, token) {
if (!this.isInService()) return null;
if (this._barrier && (0, this._barrier)("egress", kind, route, token, this)) {
if (kind !== "teardown") return null; // ignore barrier check when mid-teardown
}
var strength = 1 << (weakly ? 1 : 0);
var tuple = this._terminals.get(route) || invalidTerminalTuple;
var terminal = accessTerminal(tuple);
var bundle = invalidCallbackTuple;
var weak = 0;
var strong = 0;
var change = 0|true;
var dispatch = invalidDispatch;
var FORCE = 0|true;
if (tuple !== invalidTerminalTuple &&
terminal.has(notify)) {
bundle = terminal.get(notify);
if (!removeStrength(bundle, strength)) change = 0|false;
else if (!hasStrength(bundle, -1)) terminal.delete(notify);
if (!terminal.size) this._terminals.delete(route);
if (change) {
weak = accessTerminalWeak(tuple);
strong = accessTerminalStrong(tuple);
if (weakly) accessTerminalWeak(tuple, --weak);
else accessTerminalStrong(tuple, --strong);
if ((!this._barrier || !(0, this._barrier)("dispatch", kind, route, token, this)) &&
this._terminals.has("")) {
dispatch = new Dispatch(kind, route, strong, weak);
busMessage.call(this, 0|FORCE, "dispatch", "", dispatch, null, null, token);
}
}
}
return this;
};
Station.prototype.shuttle = function fn (route, message, exclusions, except, token) {
var auth = arguments.length < fn.length ? null : token;
var list = Array.isArray(exclusions) ? exclusions : invalidNotifications;
var item = except || invalidNotifications[0];
var FORCE = 0|true;
return busMessage.call(this, 0|!FORCE, "shuttle", route, message, list, item, auth);
};
//Station.prototype.schedule = function fn (route, message, exclusions, except, token) {
// var auth = arguments.length < fn.length ? null : token;
// var list = Array.isArray(exclusions) ? exclusions : invalidNotifications;
// var item = except || invalidNotifications[0];
// var FORCE = 0|true;
// return busMessage.call(this, 0|!FORCE, "schedule", route, message, list, item, auth);
//};
function busMessage (force, kind, route, message, exclusions, except, token) {
if (!this.isInService()) return null;
if (this._barrier && (0, this._barrier)("bus", kind, route, token, this)) {
if (!force) return null;
}
var tuple = this._terminals.get(route) || invalidTerminalTuple;
var terminal = accessTerminal(tuple);
var list = Array.isArray(exclusions) ? exclusions : invalidNotifications;
var item = except || invalidNotifications[0];
var sextet = claimTuplet(list, item, this, route, message, token);
if (this._log) (0, this._log)(message, route, this);
if (tuple !== invalidTerminalTuple) terminal.forEach(busEachMessage, sextet);
abandonTuplet(sextet);
return this;
};
function busEachMessage (tuple, notify) {
var sextet = this;
var exclusions = sextet[0];
var except = sextet[1];
var station = sextet[2];
var route = sextet[3];
var message = sextet[4];
var token = sextet[5];
if (exclusions.indexOf(notify) > -1) return;
else if (exclusions === invalidNotifications && notify === except) return;
else notify(message, route, station, accessCallbackToken(tuple));
sextet[0] = exclusions;
sextet[1] = except;
sextet[2] = station;
sextet[3] = route;
sextet[4] = message;
sextet[5] = token;
};
function bootstrapDispatchMonitor (notify, kind, token) {
var sextet = claimTuplet(invalidNotifications, notify, this, kind, null, token);
var tuple = this._terminals.get("");
var strong = 0;
var weak = 0;
var type = ""; // not "on" nor "off" for bootstrapping
// no logging -- this is an internally provided "scanning" mechanism
// (it is history rather than activity)
this._terminals.forEach(dispatchEachExcept, sextet);
if (!this._barrier || !(0, this._barrier)("dispatch", kind, "", token, this)) {
strong = accessTerminalStrong(tuple);
weak = accessTerminalWeak(tuple);
notify(new Dispatch(type, "", strong, weak), "", this, token);
}
abandonTuplet(sextet);
}
function dispatchEachExcept (tuple, route) {
var type = ""; // not "on" nor "off" for bootstrapping
var sextet = this;
var notify = sextet[1];
var station = sextet[2];
var kind = sextet[3];
var token = sextet[5];
var strong = 0;
var weak = 0;
if (route === "") return;
if (station._barrier && (0, station._barrier)("dispatch", kind, route, token, station)) return;
strong = accessTerminalStrong(tuple);
weak = accessTerminalWeak(tuple);
notify(new Dispatch(type, route, strong, weak), "", station, token);
sextet[0] = invalidNotifications;
sextet[1] = notify;
sextet[2] = station;
sextet[3] = kind;
sextet[4] = null;
sextet[5] = token;
}
function derouter (station, route, connect, weakly, token) {
return function () {
if (weakly) station.ignore(route, connect, token);
else station.off(route, connect, token);
};
}
Station.prototype.reroute = function detour (fromRoute, toRoute, token) {
var auth = arguments.length < detour.length ? null : token;
var weakly = 0|false;
if (this.on(fromRoute, connect, auth)) weakly = 0|true;
else this.watch(fromRoute, connect, auth);
return derouter(this, fromRoute, connect, weakly, auth);
function connect (msg, route, station) {
station.shuttle(toRoute, msg, null, null, auth);
}
};
Station.prototype.branch = function fn (exempt, log, scanner, outerToken, innerToken) {
var authOuter = arguments.length < fn.length - 1 ? null : outerToken;
var authInner = arguments.length < fn.length ? null : innerToken;
var DRY_RUN = 0|true;
var stems = new Map();
var inner = this;
var outer = inner.init(inner.obit(0|!DRY_RUN, authInner), exempt || null, log || null, scanner || null);
outer.obit(0|!DRY_RUN, authOuter).then(function () {
var branches = stems;
var exclusions = invalidNotifications;
var except = invalidNotifications[0];
var sextuple = claimTuplet(exclusions, exclude, outer, "", null, authOuter);
var sextet = claimTuplet(exclusions, except, inner, "", sextuple, authInner);
stems = invalidStems;
outer.off("", synchronize, authOuter);
inner.off("", synchronize, authInner);
inner = outer;
if (branches !== invalidStems) {
branches.forEach(clearEachBranch, sextet);
branches.clear();
}
abandonTuplet(sextet);
abandonTuplet(sextuple);
});
outer.on("", synchronize, authOuter);
inner.on("", synchronize, authInner);
return outer;
function synchronize (dispatch, route, station) {
var strong = dispatch.strong;
var rte = dispatch.route;
var has = 0|false;
var branch = invalidBranchTuple;
var notifiers = accessBranch(branch);
if (stems !== invalidStems) switch (dispatch.type) {
case "": // bootstrap
case "on":
case "off": if (rte !== "") {
has = 0|stems.has(rte);
if (strong > 0) {
if (has) branch = stems.get(rte);
else branch = branchTuple(stemRoute(inner, outer, authInner, authOuter, rte), 0, 0);
if (!has) stems.set(rte, branch);
if (station === inner) accessBranchInners(branch, strong);
else if (station === outer) accessBranchOuters(branch, strong);
} else if (has) {
branch = stems.get(rte);
notifiers = accessBranch(branch);
if (station === inner) {
if (accessBranchOuters(branch) < 1) {
stems.delete(pruneRoute(inner, outer, authInner, authOuter, rte, notifiers));
} else accessBranchInners(branch, strong);
} else if (station === outer) {
if (accessBranchInners(branch) < 1) {
stems.delete(pruneRoute(inner, outer, authInner, authOuter, rte, notifiers));
} else accessBranchOuters(branch, strong);
}
}
} break;
}
}
};
function stemRoute (inner, outer, authInner, authOuter, rte) {
notifyInner.prototype = notifyOuter;
notifyOuter.prototype = notifyInner;
inner.watch(rte, notifyOuter, authInner);
outer.watch(rte, notifyInner, authOuter);
return notifyInner;
function notifyInner (msg, route) {
inner.shuttle(route, msg, null, notifyInner.prototype, authInner);
}
function notifyOuter (msg, route) {
outer.shuttle(route, msg, null, notifyOuter.prototype, authOuter);
}
}
function pruneRoute (inner, outer, authInner, authOuter, rte, notifyInner) {
if (!notifyInner) return rte;
var notifyOuter = notifyInner.prototype;
inner.ignore(rte, notifyOuter, authInner);
outer.ignore(rte, notifyInner, authOuter);
return rte;
}
function clearEachBranch (branch, rte) {
var sextet = this;
var sextuple = sextet[4];
var inner = sextet[2];
var authInner = sextet[5];
var outer = sextuple[2];
var authOuter = sextuple[5];
pruneRoute(inner, outer, authInner, authOuter, rte, accessBranch(branch));
sextet[0] = invalidNotifications;
sextet[1] = invalidNotifications[0];
sextet[2] = inner;
sextet[3] = rte; // "";
sextet[4] = sextuple;
sextet[5] = authInner;
sextuple[0] = invalidNotifications;
sextuple[1] = invalidNotifications[0];
sextuple[2] = outer;
sextuple[3] = rte; // "";
sextuple[4] = null;
sextuple[5] = authOuter;
}
return new Station(null, null, null, null); // TODO: provide default log/scan methods
}());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment