Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created April 22, 2014 21:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hiredman/11194485 to your computer and use it in GitHub Desktop.
Save hiredman/11194485 to your computer and use it in GitHub Desktop.
(ns com.thelastcitadel.pipeline
(:require [clojure.core.async :refer
[chan >!! <!! <! close! go thread go-loop]]))
(defn pipeline [things]
{:pre [(or (even? (count things))
(every? #(and (vector? %)
(= 2 (count %))) things))]}
(let [x (->> (if (vector? (first things))
things
(partition-all 2 things))
(map (fn [[buf-size fun]]
{:in (chan buf-size)
:buffer-size buf-size
:fun fun}))
(partition-all 2 1)
(map (fn [[step {from-out :in :as o}]]
(assoc step
:worker-count (or (and o (min (:buffer-size step)
(:buffer-size o)))
(:buffer-size step))
:out (or from-out (chan)))))
(mapv (fn [{:keys [worker-count in out fun] :as step}]
(assoc step
:workers (vec
(repeatedly
worker-count
(fn []
(future
(loop [i (<!! in)]
(if i
(do
(doseq [ii (fun i)]
(>!! out ii))
(recur (<!! in)))
(close! out)))))))))))]
{:in (:in (first x))
:out (:out (last x))
:network x}))
(comment
(pipeline
[10 (comp list inc)
10 (comp list inc)
10 (comp list identity)])
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment