Skip to content

Instantly share code, notes, and snippets.

@cgrand
Created January 17, 2014 09:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cgrand/8470801 to your computer and use it in GitHub Desktop.
Save cgrand/8470801 to your computer and use it in GitHub Desktop.
(defn queue-process
[input output stats]
(async/go
; Wait for the first input.
(loop [q clojure.lang.PersistentQueue/EMPTY]
(let [[val-to-q ch] (async/alts!
(if-let [v (peek q)]
[input [output v]]
[input]))]
(swap! stats update-stats-as-you-see-fit q)
(cond
; Read a value from input.
val-to-q (recur (conj q val-to-q))
; Input channel is closed. => drain queue.
; Is the assumption that the process owns output right?
(identical? ch input) (do
(doseq [v q] (async/>! output v))
(async/close! output))
; Write happened.
:else (recur (pop q)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment