Skip to content

Instantly share code, notes, and snippets.

@base698 base698/Work-queue.clj
Last active Mar 11, 2016

Embed
What would you like to do?
(defn work-queue
" Work queue to process jobs with function f. Specify number of go threads "
([go-threads jobs f] (work-queue go-threads jobs f nil nil))
([go-threads jobs f cb] (work-queue go-threads jobs f nil cb) )
([go-threads jobs f notification-channel cb]
(let [work-queue (chan)
jobs-count (count jobs)
timeout-in-s 300
job-queue (chan)]
;; (go
;; (<! (timeout (* 1000 timeout-in-s)))
;; (prn "Timeout hit")
;; (close! work-queue)
;; (close! job-queue)
;; (if notification-channel (close! notification-channel)))
(doseq [i jobs]
(go
(>! work-queue i)))
(go (loop [v 0]
(if (= v jobs-count)
(do (println "Closing")
(close! work-queue)
(close! job-queue)
(if cb (cb))))
(do (<! job-queue)
(recur (inc v)))))
(doseq [i (range 0 go-threads)]
(go (loop [v (<! work-queue)]
(if-not (= v nil)
(do (let [result (apply f v)]
(if notification-channel (>! notification-channel result))
(>! job-queue "Done")
(if notification-channel (>! notification-channel "DONE"))
(recur (<! work-queue)))))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.