Skip to content

Instantly share code, notes, and snippets.

@holyjak
Created July 10, 2020 17:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save holyjak/cd744395c8630266fcde92632f01b9b8 to your computer and use it in GitHub Desktop.
Save holyjak/cd744395c8630266fcde92632f01b9b8 to your computer and use it in GitHub Desktop.
catching-transduce and a mysterious freeze
;; catching-transduce - works most of the time but blocks forever in some case(s)
;; If I interrupt it, it is at line 26, i.e. inside `(a/<!! errors-ch)`
;; inside core.async ... CountDownLatch.await
(with-test
(defn catching-transduce
"Similar to `core.async/transduce` but returns the reduced value and
captures 'anomalies' (i.e. exceptions sent as data) in the `chan` data and
captures exceptions in `xf` and `f`, stopping at the first one.
Returns the result or throws if there was any anomaly / exception."
[xf f init ch]
(let [[err-ch data-ch] (a/split throwable? ch)
errors-ch (a/into [] err-ch)
data-cnt (atom 0)
result-ch (->>
data-ch
(a/transduce
(comp
(catch-ex-as-data (fn [_] (drain data-ch)))
(map #(do (swap! data-cnt inc) %))
xf)
f
init))
[val src] (a/alts!! [result-ch errors-ch])
result (if (= src result-ch) val (a/<!! result-ch))
errs (if (= src errors-ch) val (a/<!! errors-ch))]
(cond
(seq errs) (throw (ex-info (format "Fetching data failed for %d (ok for %d); first error: %s"
(count errs) @data-cnt (first errs))
{:errs errs}
(first errs)))
(throwable? result) (throw (ex-info (str "Data transformation failed:" result) {} result))
:else result)))
(is (= [1 3]
(catching-transduce
(map identity)
conj
[]
(a/to-chan [1 #_(RuntimeException. "FAKE") 3])))
"Successful data processing")
(is (thrown-with-msg?
RuntimeException
#"FAKE"
(catching-transduce
(map identity)
conj
[]
(a/to-chan [1 (RuntimeException. "FAKE") 3])))
"Error in the input channel")
(is (thrown-with-msg?
RuntimeException
#"Data transformation failed:java.lang.RuntimeException: Poisonous two!"
(catching-transduce
(map #(if (= 2 %)
(throw (RuntimeException. "Poisonous two!"))
%))
conj
[]
(a/to-chan [1 2 3])))
"Error during transduce"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment