Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created January 14, 2021 00:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hiredman/a68a2add9751eb8de3d2776363219e13 to your computer and use it in GitHub Desktop.
Save hiredman/a68a2add9751eb8de3d2776363219e13 to your computer and use it in GitHub Desktop.
(require '[clojure.core.async :as async])
(defn swim [me in out rtt announcements seed f]
(when seed
(async/put! out {:op :ping :from me :to seed}))
(async/go-loop [s {}]
(let [[val chan] (async/alts!
(cons in (for [[_ v] s :when (:chan v)] (:chan v))))
to (fn []
(async/go
(async/<! (async/timeout (* rtt 2 (rand))))
(assoc val :op :timeout)))
gossip (when (seq s)
(rand-nth (keys s)))
s (cond-> s
(not (or (nil? (:gossip val))
(contains? s (:gossip val))))
(assoc (:gossip val) {:state :gossiped
:chan (async/go
(assoc val
:from (:gossip val)
:op :timeout))}))
state (or (get-in s [(:from val) :state]) :init)]
(when (some? val)
(when (:data val)
(f (:data val)))
(case (:op val)
:timeout (case state
:gossiped (do
(async/>! out {:op :ping
:data (f)
:from me
:to (:from val)
:gossip gossip})
(recur (assoc s (:from val) {:state :init
:chan (to)})))
:init (recur (dissoc s (:from val)))
:alive (do
(async/>! out {:op :ping
:data (f)
:from me
:to (:from val)
:gossip gossip})
(recur (assoc s (:from val) {:state :checking
:chan (to)})))
:checking (do
(dotimes [_ 3]
(async/>! out {:op :ping-indirect1
:data (f)
:from me
:to (rand-nth (keys s))
:target (:from val)
:gossip gossip}))
(recur (assoc s (:from val) {:state :suspect
:chan (to)})))
:suspect (do
(async/>! announcements {::part (:from val)
::me me})
(recur (dissoc s (:from val)))))
:pong (do
(when (= state :init)
(async/>! announcements {::join (:from val) ::me me}))
(recur (assoc s (:from val) {:state :alive :chan (to)})))
:ping-indirect1 (do
(async/>! out {:op :ping-indirect2
:from me
:data (f)
:to (:target val)
:return (:from val)
:gossip gossip})
(when (= state :init)
(async/>! announcements {::join (:from val)
::me me}))
(recur (assoc s (:from val) {:state :alive
:chan (to)})))
:ping-indirect2 (do
(async/>! out {:op :pong-indirect2
:data (f)
:from me
:to (:from val)
:return (:return val)
:gossip gossip})
(when (= state :init)
(async/>! announcements {::join (:from val)
::me me}))
(recur (assoc s (:from val) {:state :alive
:chan (to)})))
:pong-indirect2 (do
(when (= state :init)
(async/>! announcements {::join (:from val)
::me me}))
(async/>! out {:op :pong-indirect1
:data (f)
:from me
:to (:return val)
:target (:from val)
:gossip gossip})
(recur (assoc s (:from val) {:state :alive
:chan (to)})))
:pong-indirect1 (let [_ (when (= state :init)
(async/>! announcements {::join (:from val)
::me me}))
s (assoc s (:from val) {:state :alive
:chan (to)})
to2 (async/go
(async/<! (async/timeout rtt))
(assoc val
:from (:target val)
:op :timeout))
state2 (or (get-in s [(:target val) :state])
:init)]
(case state2
:init (do
(async/>! announcements
{::join (:target val) ::me me})
(recur
(assoc s (:target val) {:state :alive
:chan to2})))
(:alive
:checking
:suspect) (recur
(assoc s (:target val) {:state :alive
:chan to2}))))
:ping (do
(async/>! out {:op :pong
:data (f)
:from me
:to (:from val)
:gossip gossip})
(when (= state :init)
(async/>! announcements {::join (:from val) ::me me}))
(recur (assoc s (:from val) {:state :alive :chan (to)}))))))))
(def net (async/chan Long/MAX_VALUE))
(def netp (async/pub net :to))
(def announcements (async/chan))
(async/go-loop []
(prn (async/<! announcements))
(recur))
(def x
{1 (async/sub netp 1 (async/chan))
2 (async/sub netp 2 (async/chan))
3 (async/sub netp 3 (async/chan))})
(def rtt 5000)
(swim 1 (get x 1) net rtt announcements nil +)
(swim 2 (get x 2) net rtt announcements 1 +)
(swim 3 (get x 3) net rtt announcements 1 +)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment