Skip to content

Instantly share code, notes, and snippets.

@RutledgePaulV
Last active October 11, 2018 03:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save RutledgePaulV/bced3b91c0c3ac5c9bee9530b0c8fdb9 to your computer and use it in GitHub Desktop.
Save RutledgePaulV/bced3b91c0c3ac5c9bee9530b0c8fdb9 to your computer and use it in GitHub Desktop.
defqueue.clj
(require '[clojure.core.async :as async])
(require '[mount.core :as mount])
(defn work-queue [buffer max-retries parallelism]
(let [chan (async/chan buffer)]
(dotimes [_ parallelism]
(async/go-loop []
(when-some [i (async/<! chan)]
(let [{:keys [f retries]} (if (map? i) i {:retries 0 :f i})]
(try (f)
(catch Exception e
(if (> max-retries retries)
(async/put! chan {:f f :retries (inc retries)})
(.printStackTrace e)))))
(recur))))
chan))
(defmacro defqueue [symbol buffer max-retries parallelism]
`(let [sym# ~symbol]
(mount/defstate sym#
:start (work-queue ~buffer ~max-retries ~parallelism)
:stop (async/close! sym#))))
(defqueue downloader 100 2 4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment