Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple reservation for multiple worker peers using Datomic
(ns experiment.infra.election
(:use [datomic.api :only [q] :as d])
(:require [experiment.system :as sys]
[experiment.infra.data :as data]
[experiment.infra.protocols :as p]))
;;
;; Reserve - Simple peer coordination mechanism
;;
;; Reservations support multiple peers trying to distribution operations
;; across a cluster. As peers may go down or have problems, we don't
;; want to lose activity so we support reservation timeouts. Datomic
;; makes it trivial to perform atomic state transitions so we can simply
;; perform a synchronous transaction to checkout a group of objects.
;; It's also easy to do an asynch transaction and use a tx-queue watcher
;; to process the results, but this code doesn't support that.
;;
;; Peers should share identical query and timeout conventions to properly
;; coordinate.
;;
;; This code assumes :reserve/state, :keyword and :reserve/ts, :instant are
;; part of the Datomic schema and should probably have :nohistory attributes
;; set if reservations are high volume (and forensics aren't important or
;; done elsewhere).
;;
;; State ':ready' - The entity is available to be reserved if it matches
;; the shared query.
;;
;; State ':reserved' - The entity is reserved and being worked on by a
;; single peer (you know who you are). At the end of the work, the state
;; should be returned to :ready or to any other state to avoid timeouts.
;; On the other hand, timeouts are a cheap way to do roughly periodic tasks.
;;
;; State 'any' - The reservation system ignores all other states.
;;
;; (reserve <peer> <N> <entities> <timeout>) -- Given the candidate list
;; in entities, try to reserve all or N of them.
;;
;; (release-reservation <peer> <entities>) -- Release the reservation
;; state on the entities, returning to :ready by default or to
;;
;; (enable-reservations <peer> <entities>) -- Initial entities to :ready
;;
;; Functions consume entities or eids and return eids for downstream
;; flexibility.
;;
(defn- as-eid [entity]
(if (number? entity)
entity
(:db/id entity)))
(def ^:private reservation-schema
[;; Maintain reservation state, no history
{:db/id (d/tempid :db.part/db)
:db/ident :reserve/state
:db/valueType :db.type/keyword
:db/cardinality :db.cardinality/one
:db/noHistory true
:db.install/_attribute :db.part/db}
;; Maintain the last reservation time for timeouts
{:db/id (d/tempid :db.part/db)
:db/ident :reserve/ts
:db/valueType :db.type/instant
:db/cardinality :db.cardinality/one
:db/noHistory true
:db.install/_attribute :db.part/db}
;; Function to atomically reserve N objects, including timed out ones
{:db/id (d/tempid :db.part/app)
:db/ident :user.fn/reserve
:db/doc
"
(reserve <db> <N> <entities> <ts> <timeout>) --
Uses :reserve/state and :reserve/ts attributes to implement an atomic grab of
a set of candidate entity ids. Callers can use the :db-before and :db-after to
detect the set of entities that transitioned in that transaction. When done
with the entities, simply assert the :ready state on :reserve/state or allow
it to timeout naturally.
"
:db/fn (d/function
'{:lang :clojure
:params [db n entities ts timeout]
:code (->> (q '[:find ?eid
:in $ % [?eid ...] ?ts ?timeout
:where
(ready? ?eid ?ts ?timeout)]
db entities ts timeout
'[[(ready? ?object ?now ?timeout)
[?object :reserve/state :ready]]
[(ready? ?object ?now ?timeout)
[?object :reserve/state :reserved]
[?object :reserve/ts ?ts]
[(.getTime ?now) ?nowt]
[(.getTime ?ts) ?tst]
[(- ?nowt ?tst) ?diff]
[(> ?diff ?timeout)]]])
(map first)
(take n)
(mapcat (fn [eid]
[[:db/add eid :reserve/state :reserved]
[:db/add eid :reserve/ts ts]])))})}])
(defn- get-reserve-fn [dbval]
(first (q '[:find ?fn :where [?fn :db/ident :user.fn/reserve]] dbval)))
(defn ensure-reserve-fn [peer]
(when (empty? (get-reserve-fn (p/get-db peer)))
(p/commit! peer reservation-fns)))
(defn- reserved-entities
"Returns the subset of eids that were transitioned to the :reserved
state with the provided 'now' as the reference timestamp for computing timeout"
[report]
(q '[:find ?eid :in $ [[?eid ?a ?v ?tx] ...] :where
[?eid :reserve/state :reserved]]
(:db-after report) (:tx-data report)))
(defn reserve
"Reserve a set of entity ids by transitioning the winners to :reserve and
return the list of eids representing the matched set."
([peer entities timeout]
(reserve peer (count entities) entities timeout))
([peer N entities timeout]
{:pre [(or (nil? N) (number? N))
(or (nil? timeout) (and (number? timeout) (> timeout 0)))]}
(if (and N (> N 0) (> (count entities) 0))
(reserved-entities
(p/commit! peer
[[:user.fn/reserve N (map as-eid entities) (java.util.Date.) timeout]]))
[])))
(defn release-reservation-tx
"Reset :reserve/state to :ready and ts to provided timestamp"
([entity]
(release-reservation-tx (java.util.Date.) entity))
([ts entity]
(let [eid (as-eid entity)]
[[:db/add eid :reserve/state :ready]
[:db/add eid :reserve/ts ts]])))
(defn release-reservation
"Reset a grabbable entity for a future epoch by setting
state to ready and tx to now"
([peer entities]
(if (sequential? entities)
(p/commit! peer (mapcat release-reservation-tx entities))
(p/commit! peer (release-reservation-tx entities)))))
(defn enable-reservations
"Ensure provided entities are ready for "
[peer entities]
(if (sequential? entities)
(p/commit! peer (mapcat release-reservation-tx entities))
(p/commit! peer (release-reservation-tx entities))))
;; Test Reservation Mechanism
(defn create-object [peer name]
(p/commit! peer [{:db/id (data/tempid)
:model/id (str name)
:model/type :test
:reserve/state :ready}]))
(deftest simple-reservation
(with-temporary-peer [peer]
(schema/ensure-schema (d/connect (:uri peer)) (p/get-db peer))
(e/ensure-reserve-fn peer)
(doall (map (partial create-object peer) (range 10)))
(let [candidates (data/find-entities (p/get-db peer) :model/type :test)
reserved1 (e/reserve peer 5 candidates 2000)]
(is (= (count candidates) 10))
(is (= (count reserved1) 5))
;; Reserve a second set
(let [reserved2 (e/reserve peer 10 candidates 2000)]
;; Not the same as the first!
(is (= (count (clojure.set/intersection (set reserved1) (set reserved2))) 0))
(is (= (count reserved2) 5))
(is (= (count (e/reserve peer 10 candidates 2000)) 0)))
;; Releasing and re-reserve the set
(e/release-reservation peer candidates)
(is (= (count (e/reserve peer 20 candidates 2000)) 10))
;; Then let them timeout
(java.lang.Thread/sleep 200)
(is (= (count (e/reserve peer 10 candidates 100)) 10)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment