Skip to content

Instantly share code, notes, and snippets.

@mullr
Created February 7, 2017 21:28
Show Gist options
  • Save mullr/a584d125042dee394af8050378919b80 to your computer and use it in GitHub Desktop.
Save mullr/a584d125042dee394af8050378919b80 to your computer and use it in GitHub Desktop.
(defn shovel [n to xf from type]
(assert (pos? n))
(let [join-chs (->> (range n)
(map (fn [n]
(case type
:blocking (async/thread
(loop []
(when-let [in-val (<!! from)]
(doseq [out-val (sequence xf [in-val])]
(>!! to out-val))
(recur)))
(async/close! to))
:compute (go
(loop []
(when-let [in-val (<! from)]
(doseq [out-val (sequence xf [in-val])]
(>! to out-val))
(recur)))
(async/close! to)))))
doall)]
(go-loop [[c & cs] join-chs]
(when c
(<! c)
(recur cs)))))
(comment
(time
(let [in (chan)
out (chan)]
(async-lab/spool (range 10000) in)
(go-loop []
(when-let [x (<! out)]
(recur)))
(<!! (shovel 10 out (map inc)
in
:blocking))
))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment