Single consumer, multiple producers in core.async
(use 'clojure.core.async)
(def output (atom []))
(defn producer [ctrl k]
(go (loop [i 0]
(when-let [c (<! ctrl)]
(>! c [k i])
(>! ctrl c)
(recur (inc i))))))
(defn consumer
(consumer stop nil))
([stop buf-or-n]
(let [ctrl (chan 1)
c (chan buf-or-n)]
(>!! ctrl c)
(loop []
(let [[v p] (alts!! [stop c] :priority true)]
(if-not (identical? stop p)
(do (swap! output conj v)
(Thread/sleep 500) ; <- for the sake of the demo
(close! ctrl)))))
(defn start
(start producers nil))
([producers buf-or-n]
(let [stop (chan 1)
ctrl (consumer stop buf-or-n)]
(dotimes [k producers]
(producer ctrl k))
(fn []
(>!! stop true)))))
(def stop (start 10))
(count @output)

timgluz commented Oct 28, 2013

Hi, thanks for nice example.

I'm currently learning core.async and also trying to build multiple-producer/consumer;

But one thing is confusing for me.
is there any reason why you're using channel in channels ?
Is it somekind of pattern? or just experiment?

