Skip to content

Instantly share code, notes, and snippets.

@daveray
Created January 30, 2014 05:42
Show Gist options
  • Save daveray/8703191 to your computer and use it in GitHub Desktop.
Save daveray/8703191 to your computer and use it in GitHub Desktop.
merge a channel of channels in core.async
(defn merge+
"Takes a *channel* of source channels and returns a channel which
contains all values taken from them. The returned channel will be
unbuffered by default, or a buf-or-n can be supplied. The channel
will close after all the source channels have closed."
([in-ch] (merge+ in-ch nil))
([in-ch buf-or-n]
(let [out-ch (async/chan buf-or-n)]
(async/go-loop [cs [in-ch]]
(if-not (empty? cs)
(let [[v c] (async/alts! cs)]
(cond
(nil? v)
(recur (filterv #(not= c %) cs))
(= c in-ch)
(recur (conj cs v))
:else
(do
(async/>! out-ch v)
(recur cs))))
(async/close! out-ch)))
out-ch)))
(comment
(->> (async/to-chan [(async/to-chan [1 2 3])
(async/to-chan [8 9 10 11])
(async/to-chan [4 5 6 7])
(async/to-chan [12])
(async/to-chan [13 14 15 16 17 18]) ])
(merge+)
(async/into [])
(async/<!!))
;=>
[1 2 8 12 3 4 13 5 6 9 14 10 11 15 16 17 7 18])
@ghadishayban
Copy link

Another possibility is to construct an async/mix and just admix all the channels in.

BTW, a nice thing about channels (or Clojure) is that the channels can be put into a set, and call disj to remove a channel, instead of the filterv #(not= c).

@artemyarulin
Copy link

Thank you for the gist. I've spent like a day trying to implement flatmap function and after some time I've found your merge+ function which does the exactly this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment