Created
December 19, 2019 20:27
-
-
Save aphyr/d2e7efdc1f533d08c665c24b1f92d2e2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn converger | |
"Generates a convergence context for n threads, where values are converged | |
when (converged? values) returns true." | |
[n converged?] | |
(atom {; The convergence function | |
:converged? converged? | |
; What threads are involved? | |
:threads [] | |
; And what values did they most recently come to? | |
:values (vec (repeat n ::init))})) | |
(defn await-converger-change | |
"Wait for a converger's state to change. Might return for no reason." | |
[c] | |
(LockSupport/park c)) | |
(defn signal-converger-change! | |
"Let a converger know that its state has changed." | |
[c] | |
(doseq [t (:threads @c)] | |
(LockSupport/unpark t))) | |
(defn stable? | |
"A converger state is stable when no value is initial or evolving." | |
[c] | |
(not-any? #{::init ::evolving} (:values c))) | |
(defn divergent? | |
"A converger state is divergent when any values are initial, or some | |
non-evolving values don't pass the convergence test." | |
[c] | |
(or (some #{::init} (:values c)) | |
(let [vs (remove #{::evolving} (:values c))] | |
(and (seq vs) | |
(not ((:converged? c) vs)))))) | |
(defn converged? | |
"A converger state is converged when it is stable and non-divergent." | |
[c] | |
(and (stable? c) | |
(not (divergent? c)))) | |
(defn evolve! | |
"Takes a converger, an evolve function, an initial value, and a thread index | |
i. Takes the previous value for this thread (or the initial value), clears it | |
from the converger, and calls (evolve value) to generate value', finally | |
updating the converger with value'." | |
[converger evolve init i] | |
(let [value (atom nil)] | |
; Get the current value and clear it from the converger | |
(swap! converger (fn [c] | |
(let [v (nth (:values c) i) | |
v (if (= ::init v) init v)] | |
(reset! value v)) | |
(assoc-in c [:values i] ::evolving))) | |
; Evolve | |
(let [value' (evolve @value)] | |
(swap! converger #(assoc-in % [:values i] value')) | |
(info @value '-> value')) | |
; Let other threads know the values have changed | |
(signal-converger-change! converger) | |
nil)) | |
(defn converge! | |
"Takes a converger, an initial value, and a function which evolves values | |
over time. | |
When `converge` is called, evaluates `evolve` repeatedly, starting with the | |
initial value; each successive invocation receives the result of the previous | |
invocation. Returns a value only when (converged? [v1 v2 ...]) returns | |
truthy. | |
Always invokes evolve at least once; the initial state doesn't count as | |
converged. | |
Example:" | |
[converger init evolve] | |
; Acquire our unique thread index | |
(let [i (-> (swap! converger update :threads conj (Thread/currentThread)) | |
:threads | |
count | |
dec)] | |
(loop [] | |
(cond ; If we're converged, return our value | |
(converged? @converger) | |
(-> converger deref :values (get i)) | |
; If we're divergent, evolve | |
(divergent? @converger) | |
(do (info i "evolving") | |
(evolve! converger evolve init i) | |
(recur)) | |
; Otherwise, wait for something to change | |
true | |
(do (info i "waiting") | |
(await-converger-change converger) | |
(recur)))))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15:25:34.414 [real-pmap 0] INFO jepsen.watch - 0 evolving | |
15:25:34.414 [real-pmap 1] INFO jepsen.watch - 2 evolving | |
15:25:34.414 [real-pmap 2] INFO jepsen.watch - 1 evolving | |
15:25:34.425 [real-pmap 0] INFO jepsen.watch - [0] -> [0 1] | |
15:25:34.425 [real-pmap 1] INFO jepsen.watch - [1] -> [1 1] | |
15:25:34.425 [real-pmap 2] INFO jepsen.watch - [2] -> [2 0] | |
15:25:34.426 [real-pmap 2] INFO jepsen.watch - 1 evolving | |
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 evolving | |
15:25:34.426 [real-pmap 0] INFO jepsen.watch - 0 evolving | |
15:25:34.426 [real-pmap 1] INFO jepsen.watch - [1 1] -> [1 1 0] | |
15:25:34.426 [real-pmap 0] INFO jepsen.watch - [0 1] -> [0 1 1] | |
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 evolving | |
15:25:34.426 [real-pmap 0] INFO jepsen.watch - 0 evolving | |
15:25:34.426 [real-pmap 1] INFO jepsen.watch - [1 1 0] -> [1 1 0 1] | |
15:25:34.426 [real-pmap 0] INFO jepsen.watch - [0 1 1] -> [0 1 1 1] | |
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 waiting | |
15:25:34.426 [real-pmap 0] INFO jepsen.watch - 0 waiting | |
15:25:34.426 [real-pmap 1] INFO jepsen.watch - 2 waiting | |
15:25:34.427 [real-pmap 2] INFO jepsen.watch - [2 0] -> [2 0 1] | |
:vs [[0 1 1 1] [1 1 0 1] [2 0 1]] | |
Ran 1 tests containing 2 assertions. | |
0 failures, 0 errors. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(deftest converge-test | |
; We're going to append random numbers to arrays until their final number is the same | |
(let [n 3 | |
c (converger n (fn [colls] (apply = (map peek colls)))) | |
vs (dt/real-pmap (fn [i] | |
(converge! c [i] (fn [coll] | |
(Thread/sleep (rand-int 2)) | |
(conj coll (rand-int 2))))) | |
(range n))] | |
(prn :vs vs) | |
; Starts with initial values | |
(is (= (range n) (map first vs))) | |
; Ends with same value | |
(is (apply = (map peek vs))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment