Skip to content

Instantly share code, notes, and snippets.

@hiredman
Last active Nov 27, 2019
Embed
What would you like to do?
(defn message-queue-read-port [take-message ack-message nack-message]
(let [q (java.util.concurrent.LinkedBlockingQueue.)]
(async/thread
(loop [msg nil
handler nil]
(cond (and msg handler)
(let [_ (.lock handler)
take-cb (when (and (clojure.core.async.impl.protocols/active? handler)
(clojure.core.async.impl.protocols/commit handler))
handler)
_ (.unlock handler)]
(if take-cb
(do
(ack-message msg)
(clojure.core.async.impl.dispatch/run
(fn [] (handler msg)))
(recur nil nil))
(if-let [handler (.poll q)]
(recur msg handler)
(do
(nack-message msg)
(recur nil nil)))))
handler
(recur (take-message) handler)
:else
(recur msg (.take q)))))
(reify
clojure.core.async.impl.protocols/ReadPort
(take! [_ handler]
(.put q handler)
nil))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment