Skip to content

Instantly share code, notes, and snippets.

@thebusby
Created April 27, 2013 12:41
Show Gist options
  • Save thebusby/5472980 to your computer and use it in GitHub Desktop.
Save thebusby/5472980 to your computer and use it in GitHub Desktop.
Example of how you can use reducers to run in parallel and generate a single output stream for IO/etc.
(defmacro thread
"Execute the following S-EXP in another thread, and return thread"
[sexp]
`(let [thread# (java.lang.Thread. (fn [] ~sexp))]
(.start thread#)
thread#))
(defn fold-into-lazy-seq
"Accept a reducible sequence, and produce a lazy-seq of it's results.
Order *NOT* guarranteed!"
([rseq] (fold-into-lazy-seq nil rseq))
([q-size rseq]
(let [q (if q-size
(java.util.concurrent.LinkedBlockingQueue. ^Integer (to-int q-size))
(java.util.concurrent.LinkedBlockingQueue.))
put #(.put q %)
take #(.take q)
error-list (atom []) ;; To store any exceptions that may occur
sentinel (java.util.UUID/randomUUID) ;; So we can tell when we're done
nil-surrogate (java.util.UUID/randomUUID)] ;; LinkedBlockingQueue wont accept NULL values, so...
;; Enqueue data in another thread
(thread (try (->> rseq
(r/map (fn [x]
(put (if (nil? x)
nil-surrogate
x))
1))
(r/fold +))
(catch Exception e ;; Record any exceptions
(swap! error-list conj e))
(finally ;; Add final sentinel
(put sentinel))))
;; Produce lazy-seq
(take-while #(or (not= % sentinel)
(if-not (empty? @error-list)
(throw (first @error-list))))
(repeatedly #(let [v (take)]
(if (= v nil-surrogate)
nil
v)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment