Skip to content

Instantly share code, notes, and snippets.

@holyjak
Created April 16, 2021 07:42
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save holyjak/61d89610e488f31d8c53b8bbbad299c1 to your computer and use it in GitHub Desktop.
Save holyjak/61d89610e488f31d8c53b8bbbad299c1 to your computer and use it in GitHub Desktop.
(ns catching-transduce
"See [[catching-transduce]] below"
(:require [clojure.core.async :as a :refer [chan to-chan pipeline-blocking <!!]]))
(defmacro err-or
"If body throws an exception, catch it and return it"
[& forms]
`(try
~@forms
(catch Throwable t# t#)))
(def throwable? (partial instance? Throwable))
(defn catch-ex-as-data
"Transducer that catches errors from the transducers below (catching errors
both in the transducing and reducing functions) and returns the first one.
Params:
- `on-error` - call with the exception if there is one when done
It should be first, i.e. at the top of `(comp (catch-ex-as-data) ...)`)"
([] (catch-ex-as-data nil))
([on-error]
(fn [xf]
(fn
([] (err-or (xf)))
([result]
(let [res (if (throwable? result)
result ; don't pass anomalies down
(err-or (xf result)))]
(when (and on-error (throwable? res))
(on-error res))
res))
([result input]
(try (xf result input)
(catch Throwable t
(reduced t))))))))
(defn drain
"Close ch and discard all items on it. Returns nil.
Beware: This does not drain the channel that `ch` reads from so
it only stops things upstream, not downstream."
[ch]
(a/close! ch)
(a/go-loop []
(when (a/<! ch) (recur)))
nil)
(with-test
(defn catching-transduce
"Similar to `core.async/transduce` but returns the reduced value and
captures 'anomalies' (i.e. Java's `Throwable` sent as data) in the `chan` data and
captures exceptions in `xf` and `f`, stopping at the first one.
Returns the result or throws if there was any anomaly / exception."
[xf f init ch]
(let [[err-ch data-ch] (a/split throwable? ch)
;; ALTERNATIVE IMPL: Upon anomaly discovery in `ch`, `untap[-all]` the
;; data chan + close it, consume the test of `ch` counting
;; # items / errors
errors-ch (a/into [] err-ch)
data-cnt (atom 0)
result-ch (->>
data-ch
(a/transduce
(comp
;; BEWARE: We must drain *both* the channels
;; otherwise the other one stays open and we get stuck
(catch-ex-as-data (fn [_] (run! drain [ch data-ch])))
(map #(do (swap! data-cnt inc) %))
xf)
f
init))
[val src] (a/alts!! [result-ch errors-ch])
result (if (= src result-ch) val (a/<!! result-ch))
errs (if (= src errors-ch) val (a/<!! errors-ch))]
(cond
(seq errs) (throw (ex-info (format "There were %d errors (%d ok) in the input; first error: %s"
(count errs) @data-cnt (first errs))
{:errs errs}
(first errs)))
(throwable? result) (throw (ex-info (str "Data transformation failed:" result) {} result))
:else result)))
(is ; This failed before we started draining both ch and data-ch
(thrown-with-msg?
RuntimeException
#"Data transformation failed"
(catching-transduce
(map #(when (= :throw %)
(throw (RuntimeException. "Poisonous throw!"))))
conj
[]
;; :ok? :throw :ok :throw, :throw :ok :ok, :throw :throw :throw
(a/to-chan [:throw
(rand-nth [:ok :throw])
(rand-nth [:ok :throw])])))
"Error during transduce followed by any 2+ things")
(is (= [1 3]
(catching-transduce
(map identity)
conj
[]
(a/to-chan [1 #_(RuntimeException. "FAKE") 3])))
"Successful data processing")
(is (thrown-with-msg?
RuntimeException
#"FAKE"
(catching-transduce
(map identity)
conj
[]
(a/to-chan [1 (RuntimeException. "FAKE") 3])))
"Error in the input channel")
(is (thrown-with-msg?
RuntimeException
#"Data transformation failed:java.lang.RuntimeException: Poisonous two!"
(catching-transduce
(map #(if (= 2 %)
(throw (RuntimeException. "Poisonous two!"))
%))
conj
[]
(a/to-chan [1 2 3])))
"Error during transduce [middle]")
#_(is (thrown-with-msg?
RuntimeException
#"Data transformation failed:java.lang.RuntimeException: Poisonous two!"
(catching-transduce
(map (fn [_] (throw (RuntimeException. "Poisonous two!"))))
conj
[]
(a/to-chan [1 2])))
"Error during transduce"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment