Skip to content

Instantly share code, notes, and snippets.

@kotarak
Forked from cgrand/meikel-q.clj
Last active January 3, 2016 13:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kotarak/8470897 to your computer and use it in GitHub Desktop.
Save kotarak/8470897 to your computer and use it in GitHub Desktop.
(defn queue-process-uncontrolled
[input output stats]
(async/go
(loop [q clojure.lang.PersistentQueue/EMPTY]
(let [[val-to-q ch] (async/alts!
(if-let [v (peek q)]
[input [output v]]
[input]))]
(swap! stats update-stats-as-you-see-fit q)
(cond
; Read a value from input.
val-to-q (recur (conj q val-to-q))
; Input channel is closed. => drain queue.
(identical? ch input) (doseq [v q] (async/>! output v))
; Write happened.
:else (recur (pop q)))))
(defn queue-process-controlled
[input stats]
(let [output (async/chan)
process (queue-process-uncontrolled input output stats)]
(async/go
(<! process)
(async/close! output))
output))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment