Skip to content

Instantly share code, notes, and snippets.

@igrishaev
Created March 21, 2018 08:03
Show Gist options
  • Save igrishaev/fc484552219faa66109876b312ac03e9 to your computer and use it in GitHub Desktop.
Save igrishaev/fc484552219faa66109876b312ac03e9 to your computer and use it in GitHub Desktop.
;;
;; Workers
;;
(defn worker-start
[cfg topic func]
(let [p (promise)]
{:p p
:f (future
(with-consumer c cfg
(consumer-subscribe c topic)
(while (not (realized? p))
(when-let [msgs (not-empty (consumer-poll c))]
(pmap func msgs)))))}))
(defn worker-stop
[w]
(deliver (:p w) true))
(defn worker-status
[w]
(and
(not (future-cancelled? (:f w)))
(not (realized? (:f w)))))
(defn worker-error
[w]
(when-not (worker-status w)
(try
(-> w :f deref)
nil
(catch Exception e
e))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment