Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A simple election mechanism for Datomic peers
;; This is a relatively simple election mechanism for a set of
;; processes or datomic peers where a processing step requires a
;; global master. The elector entity is used to connect a set of peer
;; entities labeled as :election/master. Using the algorithm,
;; there will only ever be one :election/master in the :election/peer
;; group defined on the elector.
;;
;; The user API is simple a guarded transaction which calls a txn-fn
;; if the entity is the current master. If not it holds an election
;; if the timeout has expired.
;;
;; Each peer calls reset-elector! on startup to ensure it is on the
;; peer election list. Peers should have identical durations. Peers
;; can immediately start running guarded transactions. If new nodes
;; come online and reset the election, it simply delays the time until
;; the work completes, each unit of work will complete exactly once if
;; the query to grab the work drives a consistent transaction.
;;
;; The timeout is used for fail over. If I win an election, then die,
;; my txn-fn will either not be called or the results will not be
;; committed. If txn-fn is purely functional against a dbval, if the
;; master dies, a new master will be elected shortly, run the same query
;; and pick up work not completed by the now dead master.
;;
;; This assuming tasks run guarded transactions on intervals of approximately
;; size S consistently across the peer group and the master election duration is
;; an intervals of size P where P >> S.
;;
;; Max time to first commit is: S * peers
;; Max time to fail over is: P + S
;;
;; This algorithm allows for unbalanced work distribution if all peers run
;; on a time interval of strictly S and S is the right multiple of P (the same
;; master might always win).
(defn- get-elector [db name]
(data/find-entity db :election/name name))
(defn- expired-election? [elector]
(< (.getTime (:election/ts elector))
(.getTime (java.util.Date.))))
(defn- elect!
"Using elector named 'name', try to elect 'entity' as master,
return true if provided peer won"
[dbpeer name entity]
(p/commit! dbpeer
[[:user.fn/electForDuration name (:db/id entity)]]))
(defn- master?
"Is the entity a master?"
[entity]
(:election/master? entity))
(defn- true-master?
"Formally, entity is a master under election named 'name' if elector
ts > now and (= (:election/master entity) true)"
[dbval name entity]
(d/invoke :user.fn/electionMaster? name (:db/id entity)))
(defn- assert-master-txn
"Add a transaction statement that ensures the entity is a master
under election name during the transaction commit"
[name entity]
[[:user.fn/assertElectionMaster name entity]])
(defn- elected? [dbpeer name entity]
{:pre [(data/entity? entity) (keyword? name)]}
(or (and (expired-election? (get-elector (p/get-db dbpeer) name))
(elect! dbpeer name entity))
(master? entity)))
;;
;; API: Create an elector w/ duration and peer set
;;
(defn reset-elector!
"Reset the state of the named elector to the beginning of duration
and return a fresh entity representing the elected master"
[dbpeer name seconds entity]
{:pre [(every? data/entity? entities) (number? seconds) (keyword? name)]}
(let [eid (data/tempid)]
(p/commit!
dbpeer
[{:db/id eid
:election/name name
:election/ts (java.util.Date.)
:election/duration (* 1000 seconds)}
[:db/add eid :election/peer (:db/id entity)]])
(let [report (elect! dbpeer name (first entities))]
(data/query-entity '[:find ?eid :in $ ?name :when
[?elector :election/name ?name]
[?elector :election/peer ?eid]
[?eid :election/master true]]
(:db-after report) name))))
(defn guarded-commit!
"Only calls txn-fn on args if the provided entity is
the elected master according to name, either now or
because it wins a new election. Call txn-fn and commit
with a guard to ensure consistency under race conditions.
Timeouts are intended to be human time (many seconds to
minutes, not ms)."
[dbpeer [name entity] txn-fn & args]
(when (elected? dbpeer name entity)
(p/commit! dbpeer
(vec
(concat
(assert-master-txn name entity)
(apply txn-fn args))))))
(defn speculative-commit!
"At the expense of extra calls to txn-fn by peers, you
can simply make a guarded commit if you are the current
master. Useful if election step happens elsewhere.
txn-fn returns a valid transaction"
[dbpeer [name entity] txn-fn & args]
(when (master? entity)
(p/commit! dbpeer
(vec
(concat
(assert-master-txn name entity)
(apply txn-fn args))))))
(def shorthand-attributes
;; Election attributes
[[:election/name :keyword :unique-identity]
[:election/ts :instant :noHistory]
[:election/duration :long :noHistory]
[:election/peer :ref :noHistory]
[:election/master? :boolean :noHistory]])
(def ^:private data-fns
[{:db/id (d/tempid :part/app)
:db/ident :user.fn/add-new
:db/doc "Like :db/add but has no effect if an identical assertion
is already present in the database."
:db/fn (d/function
'{:lang :clojure
:params [db e a v]
:code (when (empty? (q '[:find ?e :in $ ?e ?a ?v
:where [?e ?a ?v]]
db e a v))
[[:db/add e a v]])})}
{:db/id (d/tempid :part/app)
:db/ident :user.fn/electForDuration
:db/doc "Supports a trivial election algorithm which periodically
and atomically chooses the first master to run the election
after a timeout occurs. See infra/election.clj"
:db/fn (d/function
'{:lang :clojure
:params [db name meid]
:code (let [[eid inst duration]
(first
(q '[:find ?elector ?inst ?dur :in $ ?name ?me :where
[?elector :election/name ?name]
[?elector :election/duration ?dur]
[?elector :election/ts ?inst]]
db name))
elector (datomic.api/entity eid)
ts (.getTime inst)
now (.getTime (java.util.Date.))]
(if (< ts now) ;; winner?
(vec
(concat
[[:db/add eid :election/ts
(java.util.Date. (+ now duration))]]
(mapv (fn [peer]
(let [id (:db/id peer)]
[:db/add-new id :election/master?
(boolean (= ie meid))]))
(:election/peer elector))))
[]))})}
{:db/id (d/tempid :part/app)
:db/ident :user.fn/electionMaster?
:db/doc "Non-txn function that tests whether eid is a master
according to elector 'name' for the current value of the db"
:db/fn (d/function
'{:lang :clojure
:params [db name eid]
:code (boolean
(ffirst
(q '[:find ?master :in $ ?name ?eid :where
[?elector :election/name ?name]
[?elector :election/ts ?ts]
[(< (.getTime ?ts) (.getTime (java.lang.Date.)))]
[?elector :election/peer ?eid]
[?eid :election/master? ?master]])
db name eid))})}
{:db/id (d/tempid :part/app)
:db/ident :user.fn/assertElectionMaster
:db/doc "The provided entity and attribute must = value at commit
time for this to be a valid transaction, otherwise abort"
:db/fn (d/function
'{:lang :clojure
:params [db name eid]
:code (if (datomic.api/invoke :user.fn/electionMaster name eid)
[]
(ex-info (str "Assertion failed for [" eid " " attr " " db_value "] != " value)))})})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment