Skip to content

Instantly share code, notes, and snippets.

@ghadishayban
Last active December 17, 2015 05:59
Show Gist options
  • Save ghadishayban/5561954 to your computer and use it in GitHub Desktop.
Save ghadishayban/5561954 to your computer and use it in GitHub Desktop.
Experiments with a parallel ->> macro
(ns fancie-queues
(:import [java.util.concurrent BlockingQueue
LinkedBlockingQueue]))
;; see queue-chain at the bottom
;; (queue-chain (range 20) (map (partial * 5)) (map vector) (map pr-str))
;; (queue-chain (range 20) (remove odd?) (map pr-str))
;; for more fun:
;; (queue-chain (range 20) (remove odd?) (map (fn [i] (Thread/sleep 150) (pr-str i))))
;; throwing an exception in the middle will not be fun, fyi also queues don't take nil
(defn queue-seq
[^BlockingQueue q sentinel]
(let [step (fn step []
(lazy-seq
(let [item (.take q)]
(when-not (identical? item sentinel)
(cons item (step))))))]
(step)))
(defn- feeder [coll ^BlockingQueue q sentinel]
(reduce
(fn [_ item]
(.put q item))
nil coll)
(.put q sentinel))
(def ^:private default-queue-size 5000)
(defn- fn-builder
[expr]
`(with-meta
(fn [q# qseq# sentinel#]
(feeder (~@expr qseq#) q# sentinel#))
~(meta expr)))
(defn- queue-chain*
[opts exprs]
(when-not (> (count exprs) 1)
(throw (ex-info "Must have at least one initial collection and threading expression" {})))
`(let [default-queue-size# (or (:queue-size ~opts) default-queue-size)
sentinel# (Object.)
initial-q# (LinkedBlockingQueue. (or ~(-> (first exprs) meta :queue-size)
default-queue-size#))]
(future (feeder ~(first exprs) initial-q# sentinel#))
(reduce
(fn [~'input-qs ~'qfn]
(let [~'next-queue (LinkedBlockingQueue.
(or (-> ~'qfn meta :queue-size)
default-queue-size#))]
(future (~'qfn ~'next-queue ~'input-qs sentinel#))
(queue-seq ~'next-queue sentinel#)))
(queue-seq initial-q# sentinel#)
~(mapv fn-builder (next exprs)))))
(defmacro queue-chain
"Like clojure.core/->>, but each expression will run in parallel on a future
with blocking queues tying them together.
Each expression takes optional ^:queue-size metadata.
No nil values allowed because of queue limitations."
[& exprs]
(if (map? (first exprs))
(queue-chain* (first exprs) (next exprs))
(queue-chain* {:queue-size default-queue-size} exprs)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment