Skip to content

Instantly share code, notes, and snippets.

@terjesb
Forked from candera/fanout.clj
Created November 28, 2013 18:45
Show Gist options
  • Save terjesb/7696557 to your computer and use it in GitHub Desktop.
Save terjesb/7696557 to your computer and use it in GitHub Desktop.
(require '[clojure.core.async :as async])
(let [l (Object.)]
(defn log
[fmt & args]
(locking l
(apply printf fmt args)
(flush))))
(let [req (async/chan 10)
resp (async/chan 20)
timeout (async/timeout 60000)
n 20]
(dotimes [i 4]
(async/go-loop []
(if-let [val (async/<! req)]
(do
(log "Worker %d received value %d\n" i val)
(Thread/sleep 1000)
(log "Worker %d returning success on value %d\n" i val)
(async/>! resp :success)
(recur))
:done)))
(loop [[head & more :as coll] (range n)
successes 0]
(if (= n successes)
:succeeded
(do
(when-not head (async/close! req))
(let [ports (into [resp timeout] (when head [[req head]]))
[val port] (async/alts!! ports)]
(cond
(= port req) (do (log "Put value %d on request channel\n"
head)
(recur more successes))
(= port resp) (if (= val :success)
(do (log "Got success# %d\n" (inc successes))
(recur coll (inc successes)))
(throw (ex-info "Unexpected response" {:response val})))
(= port timeout) (throw (ex-info "timedout" {:reason :timed-out}))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment