Skip to content

Instantly share code, notes, and snippets.

@gerritjvv
Created October 30, 2013 11:07
Show Gist options
  • Save gerritjvv/7230860 to your computer and use it in GitHub Desktop.
Save gerritjvv/7230860 to your computer and use it in GitHub Desktop.
This is for a situation where you have N amount of threads reading from different sources and want to consume all of the data they produce as a single sequence. Can be described as merging N queues from different sources and works well when the data produced is from IO. e.g. My usage is with kafka, I have multiple kafka topics and partitions to …
(use 'clojure.core.async)
;this is the function you want to use
(defn lazy-channels [chs]
(lazy-seq (cons (let [ [v _] (alts!! chs)] v) (lazy-channels chs))))
;now for the tesging
(def chs [ (chan) (chan) (chan) ]) ; the channels can come from anywhere, here we are using three channels for testing
(thread (dotimes [i 1000]
(>!! (rand-nth chs) (str "m-" i)))) ;add 1000 elements to a random selection of channels
;create a sequence
(def s (lazy-channels chs))
;now consume, please note that this will block when no more data is available on the channels
(doseq [msg s]
(prn msg))
@rodnaph
Copy link

rodnaph commented Oct 31, 2013

For my education, is recur not needed in lazy-channels?

https://gist.github.com/rodnaph/7238399

@stathissideris
Copy link

@rodnaph : No, recur is not needed because the call to lazy-channels is not truly recursive since it appears within lazy-seq. lazy-seq does not compute the code that it contains until the elements of the sequence is requested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment