Created
January 30, 2014 05:42
-
-
Save daveray/8703191 to your computer and use it in GitHub Desktop.
merge a channel of channels in core.async
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defn merge+ | |
"Takes a *channel* of source channels and returns a channel which | |
contains all values taken from them. The returned channel will be | |
unbuffered by default, or a buf-or-n can be supplied. The channel | |
will close after all the source channels have closed." | |
([in-ch] (merge+ in-ch nil)) | |
([in-ch buf-or-n] | |
(let [out-ch (async/chan buf-or-n)] | |
(async/go-loop [cs [in-ch]] | |
(if-not (empty? cs) | |
(let [[v c] (async/alts! cs)] | |
(cond | |
(nil? v) | |
(recur (filterv #(not= c %) cs)) | |
(= c in-ch) | |
(recur (conj cs v)) | |
:else | |
(do | |
(async/>! out-ch v) | |
(recur cs)))) | |
(async/close! out-ch))) | |
out-ch))) | |
(comment | |
(->> (async/to-chan [(async/to-chan [1 2 3]) | |
(async/to-chan [8 9 10 11]) | |
(async/to-chan [4 5 6 7]) | |
(async/to-chan [12]) | |
(async/to-chan [13 14 15 16 17 18]) ]) | |
(merge+) | |
(async/into []) | |
(async/<!!)) | |
;=> | |
[1 2 8 12 3 4 13 5 6 9 14 10 11 15 16 17 7 18]) | |
Thank you for the gist. I've spent like a day trying to implement flatmap function and after some time I've found your merge+ function which does the exactly this.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Another possibility is to construct an async/mix and just admix all the channels in.
BTW, a nice thing about channels (or Clojure) is that the channels can be put into a set, and call disj to remove a channel, instead of the filterv #(not= c).