Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Single consumer, multiple producers in core.async
(use 'clojure.core.async)
(def output (atom []))
(defn producer [ctrl k]
(go (loop [i 0]
(when-let [c (<! ctrl)]
(>! c [k i])
(>! ctrl c)
(recur (inc i))))))
(defn consumer
([stop]
(consumer stop nil))
([stop buf-or-n]
(let [ctrl (chan 1)
c (chan buf-or-n)]
(>!! ctrl c)
(thread
(loop []
(let [[v p] (alts!! [stop c] :priority true)]
(if-not (identical? stop p)
(do (swap! output conj v)
(Thread/sleep 500) ; <- for the sake of the demo
(recur))
(close! ctrl)))))
ctrl)))
(defn start
([producers]
(start producers nil))
([producers buf-or-n]
(let [stop (chan 1)
ctrl (consumer stop buf-or-n)]
(dotimes [k producers]
(producer ctrl k))
(fn []
(>!! stop true)))))
(comment
(def stop (start 10))
(count @output)
)
@timgluz

This comment has been minimized.

Copy link

timgluz commented Oct 28, 2013

Hi, thanks for nice example.

I'm currently learning core.async and also trying to build multiple-producer/consumer;

But one thing is confusing for me.
is there any reason why you're using channel in channels ?
Is it somekind of pattern? or just experiment?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.