Skip to content

Instantly share code, notes, and snippets.

@dball
Last active August 29, 2015 14:10
Show Gist options
  • Save dball/7ab19a5b3c348db6949c to your computer and use it in GitHub Desktop.
Save dball/7ab19a5b3c348db6949c to your computer and use it in GitHub Desktop.
Panicky pipeline
(ns transduction
(:require [clojure.core.async :as async]))
(defn panicky
[pipeline-fn n to xf from]
(let [errors (atom #{})
ex-handler (fn [ex]
(async/close! from)
(swap! errors conj ex))
rf (remove (fn [_] (seq @errors)))
finisher (pipeline-fn n to (comp rf xf) from true ex-handler)]
(async/go (let [finished (async/<! finisher)
errors @errors]
(if (seq errors)
errors
finished)))))
(defn do-it
[]
(let [in (async/to-chan (range 1000))
out (async/chan)
xf (map (fn [v] (println v) (throw (Exception. (str v)))))]
(async/into [] out)
(async/<!! (panicky async/pipeline 2 out xf in))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment