Created
July 10, 2020 17:57
-
-
Save holyjak/cd744395c8630266fcde92632f01b9b8 to your computer and use it in GitHub Desktop.
catching-transduce and a mysterious freeze
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; catching-transduce - works most of the time but blocks forever in some case(s) | |
;; If I interrupt it, it is at line 26, i.e. inside `(a/<!! errors-ch)` | |
;; inside core.async ... CountDownLatch.await | |
(with-test | |
(defn catching-transduce | |
"Similar to `core.async/transduce` but returns the reduced value and | |
captures 'anomalies' (i.e. exceptions sent as data) in the `chan` data and | |
captures exceptions in `xf` and `f`, stopping at the first one. | |
Returns the result or throws if there was any anomaly / exception." | |
[xf f init ch] | |
(let [[err-ch data-ch] (a/split throwable? ch) | |
errors-ch (a/into [] err-ch) | |
data-cnt (atom 0) | |
result-ch (->> | |
data-ch | |
(a/transduce | |
(comp | |
(catch-ex-as-data (fn [_] (drain data-ch))) | |
(map #(do (swap! data-cnt inc) %)) | |
xf) | |
f | |
init)) | |
[val src] (a/alts!! [result-ch errors-ch]) | |
result (if (= src result-ch) val (a/<!! result-ch)) | |
errs (if (= src errors-ch) val (a/<!! errors-ch))] | |
(cond | |
(seq errs) (throw (ex-info (format "Fetching data failed for %d (ok for %d); first error: %s" | |
(count errs) @data-cnt (first errs)) | |
{:errs errs} | |
(first errs))) | |
(throwable? result) (throw (ex-info (str "Data transformation failed:" result) {} result)) | |
:else result))) | |
(is (= [1 3] | |
(catching-transduce | |
(map identity) | |
conj | |
[] | |
(a/to-chan [1 #_(RuntimeException. "FAKE") 3]))) | |
"Successful data processing") | |
(is (thrown-with-msg? | |
RuntimeException | |
#"FAKE" | |
(catching-transduce | |
(map identity) | |
conj | |
[] | |
(a/to-chan [1 (RuntimeException. "FAKE") 3]))) | |
"Error in the input channel") | |
(is (thrown-with-msg? | |
RuntimeException | |
#"Data transformation failed:java.lang.RuntimeException: Poisonous two!" | |
(catching-transduce | |
(map #(if (= 2 %) | |
(throw (RuntimeException. "Poisonous two!")) | |
%)) | |
conj | |
[] | |
(a/to-chan [1 2 3]))) | |
"Error during transduce")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment