Skip to content

Instantly share code, notes, and snippets.

@aphyr
Created December 19, 2019 20:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aphyr/d2e7efdc1f533d08c665c24b1f92d2e2 to your computer and use it in GitHub Desktop.
Save aphyr/d2e7efdc1f533d08c665c24b1f92d2e2 to your computer and use it in GitHub Desktop.
(defn converger
"Generates a convergence context for n threads, where values are converged
when (converged? values) returns true."
[n converged?]
(atom {; The convergence function
:converged? converged?
; What threads are involved?
:threads []
; And what values did they most recently come to?
:values (vec (repeat n ::init))}))
(defn await-converger-change
"Wait for a converger's state to change. Might return for no reason."
[c]
(LockSupport/park c))
(defn signal-converger-change!
"Let a converger know that its state has changed."
[c]
(doseq [t (:threads @c)]
(LockSupport/unpark t)))
(defn stable?
"A converger state is stable when no value is initial or evolving."
[c]
(not-any? #{::init ::evolving} (:values c)))
(defn divergent?
"A converger state is divergent when any values are initial, or some
non-evolving values don't pass the convergence test."
[c]
(or (some #{::init} (:values c))
(let [vs (remove #{::evolving} (:values c))]
(and (seq vs)
(not ((:converged? c) vs))))))
(defn converged?
"A converger state is converged when it is stable and non-divergent."
[c]
(and (stable? c)
(not (divergent? c))))
(defn evolve!
"Takes a converger, an evolve function, an initial value, and a thread index
i. Takes the previous value for this thread (or the initial value), clears it
from the converger, and calls (evolve value) to generate value', finally
updating the converger with value'."
[converger evolve init i]
(let [value (atom nil)]
; Get the current value and clear it from the converger
(swap! converger (fn [c]
(let [v (nth (:values c) i)
v (if (= ::init v) init v)]
(reset! value v))
(assoc-in c [:values i] ::evolving)))
; Evolve
(let [value' (evolve @value)]
(swap! converger #(assoc-in % [:values i] value'))
(info @value '-> value'))
; Let other threads know the values have changed
(signal-converger-change! converger)
nil))
(defn converge!
"Takes a converger, an initial value, and a function which evolves values
over time.
When `converge` is called, evaluates `evolve` repeatedly, starting with the
initial value; each successive invocation receives the result of the previous
invocation. Returns a value only when (converged? [v1 v2 ...]) returns
truthy.
Always invokes evolve at least once; the initial state doesn't count as
converged.
Example:"
[converger init evolve]
; Acquire our unique thread index
(let [i (-> (swap! converger update :threads conj (Thread/currentThread))
:threads
count
dec)]
(loop []
(cond ; If we're converged, return our value
(converged? @converger)
(-> converger deref :values (get i))
; If we're divergent, evolve
(divergent? @converger)
(do (info i "evolving")
(evolve! converger evolve init i)
(recur))
; Otherwise, wait for something to change
true
(do (info i "waiting")
(await-converger-change converger)
(recur))))))
15:25:34.414 [real-pmap 0] INFO jepsen.watch - 0 evolving
15:25:34.414 [real-pmap 1] INFO jepsen.watch - 2 evolving
15:25:34.414 [real-pmap 2] INFO jepsen.watch - 1 evolving
15:25:34.425 [real-pmap 0] INFO jepsen.watch - [0] -> [0 1]
15:25:34.425 [real-pmap 1] INFO jepsen.watch - [1] -> [1 1]
15:25:34.425 [real-pmap 2] INFO jepsen.watch - [2] -> [2 0]
15:25:34.426 [real-pmap 2] INFO jepsen.watch - 1 evolving
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 evolving
15:25:34.426 [real-pmap 0] INFO jepsen.watch - 0 evolving
15:25:34.426 [real-pmap 1] INFO jepsen.watch - [1 1] -> [1 1 0]
15:25:34.426 [real-pmap 0] INFO jepsen.watch - [0 1] -> [0 1 1]
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 evolving
15:25:34.426 [real-pmap 0] INFO jepsen.watch - 0 evolving
15:25:34.426 [real-pmap 1] INFO jepsen.watch - [1 1 0] -> [1 1 0 1]
15:25:34.426 [real-pmap 0] INFO jepsen.watch - [0 1 1] -> [0 1 1 1]
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 waiting
15:25:34.426 [real-pmap 0] INFO jepsen.watch - 0 waiting
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 waiting
15:25:34.427 [real-pmap 2] INFO jepsen.watch - [2 0] -> [2 0 1]
:vs [[0 1 1 1] [1 1 0 1] [2 0 1]]
Ran 1 tests containing 2 assertions.
0 failures, 0 errors.
(deftest converge-test
; We're going to append random numbers to arrays until their final number is the same
(let [n 3
c (converger n (fn [colls] (apply = (map peek colls))))
vs (dt/real-pmap (fn [i]
(converge! c [i] (fn [coll]
(Thread/sleep (rand-int 2))
(conj coll (rand-int 2)))))
(range n))]
(prn :vs vs)
; Starts with initial values
(is (= (range n) (map first vs)))
; Ends with same value
(is (apply = (map peek vs)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment