Starting with a basic core.async snippet, introducing exceptions in producer/consumer code allows me to explore how to manage those exceptions.
(require '[clojure.core.async :as a :refer (<! <!! >! chan go)])
(defn produce-values [ch]
(go
(let [result
(loop [n 0]
(if (< n 10)
(do (>! ch n) (recur (inc n)))
{ :count n }))]
(a/close! ch)
result)))
(defn consume-values [ch]
(go
(loop [n 0]
(if-let [v (<! ch)]
(do
(println v)
(recur (inc n)))
{ :count n }))))
(<!! (go
(let [c (chan)
pdr (produce-values c)
csr (consume-values c)]
{:pdr (<! pdr)
:csr (<! csr)})))
Don't know if these is the simpler pattern to start with, but it works well (at least crashes well) for me.
I replace the whole function by a failing one.
(defn produce-values [ch]
(go
(throw (Exception. "my exception message"))))
As expected, the evaluation never stops.
Wrapping the whole go body with a try-catch, closing the channel in finally and that's it.
(defn produce-values [ch]
(go
(try
(throw (Exception. "my exception message"))
(catch Throwable t (println (.getMessage t)) :error)
(finally (a/close! ch)))))
That's the difficult part for me, I find some ways to make it "works" but I would like to find better solutions.
Return to initial producer and write consumer as :
(defn consume-values [ch]
(go
(throw (Exception. "my exception message"))))
Same problem, code is blocked.
(defn consume-values [ch]
(go
(try
(throw (Exception. "my exception message"))
(catch Throwable t (println (.getMessage t)) :error)
(finally (a/close! ch)))))
Exception is caught, but code is broken.
Moving the close!
in catch fix the problem. WHY ?
I probably miss some subtleties about catch and finally.
(defn consume-values [ch]
(go
(try
(throw (Exception. "my exception message"))
(catch Throwable t (a/close! ch) (println (.getMessage t)) :error))))
I don't like this fix.
- It's different from the previous one for a reason I can't get.
- Closing the channel should be a producer's responsability, no ?
I play with the code removing the waiting for producer ((<! pdr)
), adding a buffer to channel bigger than the number of values emitted.
My understanding is that producer can't write in channel since previous value is not read. Closing the channel, indicates to producer that he don't need to put more values.