Skip to content

Instantly share code, notes, and snippets.

@gardnervickers
Created March 23, 2016 03:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gardnervickers/5cf841aaac81386f657e to your computer and use it in GitHub Desktop.
Save gardnervickers/5cf841aaac81386f657e to your computer and use it in GitHub Desktop.
(def msg-seq [{:msg 1} {:msg 2} {:msg 3} {:type :barrier} {:msg 4} {:msg 5}])
(defn start-peer [id a]
(let [ch (chan)
peer (go-loop [msg (<! ch)]
(when msg
(swap! a update id conj msg)
(recur (<! ch))))]
ch))
(defn start-alts [& chans]
(let [ch (chan)
v (go-loop [msg (<! ch)]
(when msg
(alts!
(mapv (fn [c] [c msg]) chans))
(recur (<! ch))))]
ch))
(defn start-router [pred t f]
(let [ch (chan)
v (go-loop [msg (<! ch)]
(when msg
(println msg)
(if (pred msg)
(>! t msg)
(>! f msg))
(recur (<! ch))))]
ch))
(let [db (atom {:p1 [] :p2 [] :p3 []})
p1 (start-peer :p1 db)
p2 (start-peer :p2 db)
p3 (start-peer :p3 db)
barrier-chan (l/broadcast p1 p2 p3)
alts-chan (start-alts p1 p2 p3)
router-chan (start-router (fn [m] (= (:type m) :barrier))
barrier-chan
alts-chan)]
(pipe (l/spool msg-seq) router-chan)
(Thread/sleep 2000)
(clojure.pprint/pprint @db))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment