Skip to content

Instantly share code, notes, and snippets.

@devn
Created May 13, 2014 00:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devn/c1f29b1efabc454c5246 to your computer and use it in GitHub Desktop.
Save devn/c1f29b1efabc454c5246 to your computer and use it in GitHub Desktop.
(defn pipeline [pipeline-config]
{:pre [(even? (count pipeline-config))]}
(let [x (->> (partition-all 2 pipeline-config)
(map (fn [[buf-size fun]]
{:in (async/chan buf-size)
:buffer-size buf-size
:fun fun}))
(partition-all 2 1)
(map (fn [[step {from-out :in :as o}]]
(assoc step
:worker-count (or (and o (min (:buffer-size step)
(:buffer-size o)))
(:buffer-size step))
:out (or from-out (async/chan)))))
(mapv (fn [{:keys [worker-count in out fun] :as step}]
(assoc step
:workers
(let [n-chans (repeatedly
worker-count
(fn []
(async/mapcat< (fn [x] (try (fun x)
(catch Exception e
(println e)
[])))
in)))]
(async/pipe (async/merge n-chans) out))))))]
{:in (:in (first x))
:out (:out (last x))
:network x}))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment