Skip to content

Instantly share code, notes, and snippets.

@alandipert
Created September 13, 2011 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save alandipert/1214282 to your computer and use it in GitHub Desktop.
Save alandipert/1214282 to your computer and use it in GitHub Desktop.
serial I/O with blocking queue and trampoline
(defn serial-processor
"Given BlockingQueue q, returns a map of two functions, :process and :stop.
:process takes a constant function (presumably for side effects) and
adds it to the queue, where it is invoked serially with respect to
other functions added via :process.
:stop takes a non-function value to return, blocks on the completion
of remaining process functions, and returns the value."
[^java.util.concurrent.BlockingQueue q]
(let [processor (future (trampoline (.take q)))]
{:process (fn [f] (.put q (fn [] (f) (.take q))))
:stop (fn [eoq] (.put q eoq) @processor)}))
(let [{:keys [process stop]} (serial-processor (java.util.concurrent.LinkedBlockingQueue.))]
;; shovel work into the queue
(dotimes [i 10]
(future (process #(println i))))
;; block on completion
(println (stop "all done")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment