Skip to content

Instantly share code, notes, and snippets.

@dvliman
Created July 2, 2018 07:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvliman/d86f92822ef1584ed33d6753f426f7f7 to your computer and use it in GitHub Desktop.
Save dvliman/d86f92822ef1584ed33d6753f426f7f7 to your computer and use it in GitHub Desktop.
pipejine-core
(ns pipejine.core
(:require [clojure.tools.logging :as log]
[clojure.stacktrace :as st])
(:import [java.util.concurrent LinkedBlockingQueue CountDownLatch TimeUnit]
[org.slf4j MDC]))
(defn new-queue
"Create and initialize a new queue"
[{:keys [name queue-size number-of-consumer-threads number-of-producers partition time-out]
:or {name (gensym name), queue-size 1, number-of-consumer-threads 1,
number-of-producers 1, partition 0, time-out 500}}]
{:q (LinkedBlockingQueue. queue-size)
:name name
:consumers-done (CountDownLatch. number-of-consumer-threads)
:producers-done (CountDownLatch. number-of-producers)
:part partition ;; :all for gathering everything (one consumer thread only!)
:thread-num number-of-consumer-threads
:run (atom true)
:time-out time-out})
(defn produce
"Produce some data into queue"
[{:keys [^LinkedBlockingQueue q run time-out]} d]
(loop [r false] ;; don't put anything on the queue when aborted,
(when (and d (not r) @run) ;; this is to avoid blocking producing threads
(recur (.offer q d time-out TimeUnit/MILLISECONDS)))))
(defn produce-done
"Tell a queue that no more data will be produced, each producer should call this only once"
[{:keys [^CountDownLatch producers-done]}]
(.countDown producers-done))
(defn shutdown [{:keys [run ^CountDownLatch producers-done] :as q}]
(reset! run false)
;; drain latch to release the supervisors
(while (not (zero? (.getCount producers-done)))
(produce-done q)))
(defn- consumer [{:keys [^LinkedBlockingQueue q
^CountDownLatch consumers-done
^CountDownLatch producers-done
part run time-out]} f]
(let [deliver (fn [d acc]
(let [acc (if d (conj acc d) acc)]
(cond
;; No partitioning, just deliver d
(and (number? part) (zero? part))
(do
(when d (f d))
[])
;; Partition filled, deliver it
(= (count acc) part)
(do
(f acc)
[])
;; Keep accumulating
:default
acc)))
stop? (fn []
(or
(not @run) ;; stop flag
(and (zero? (.getCount producers-done)) ;; producers done and q empty
(zero? (.size q)))))]
(try
(loop [acc []]
(let [d (.poll q time-out TimeUnit/MILLISECONDS)
acc (deliver d acc)]
(if (stop?)
(when-not (zero? (count acc)) (f acc)) ;; deliver any outstanding data before quitting
(recur acc))))
(catch Exception e
(let [writer (java.io.StringWriter.)]
(binding [*out* writer]
(st/print-stack-trace e)
(log/error (str writer)))))
(finally
(.countDown consumers-done)))))
(defn- supervisor [{:keys [consumers-done producers-done] :as q} f]
(.await consumers-done)
(shutdown q) ;; if all consumers have died, we shutdown the queue
(.await producers-done)
(f))
(defn spawn-consumers
"Spawn consumer threads for a queue, function f will be called on each data item consumed"
[{:keys [name thread-num] :as q} f]
(dotimes [num thread-num]
(future (MDC/put "pipejine.q" (format "%s-%02d" name num)) (consumer q f))))
(defn spawn-supervisor
"Spawn a supervisor thread for a queue, function f will be called when the consumers are done
with all items put into the queue (will only happen once).
Please note that multiple supervisors can be spawned per queue"
[q f]
(future (supervisor q f)))
(defn producer-of
"Mark q1 as producer of qs"
[q1 & qs]
(spawn-supervisor q1 #(doseq [q qs] (produce-done q))))
;; -------------------------------------------
;; Helpers
(defn prod-fn
"Returns a function used to produce data into a queue"
[q]
(fn [d] (produce q d)))
(defn read-seq
"Returns a lazy-seq with data consumed from a q. To be used *INSTEAD OF* spawn-consumers"
[{:keys [run time-out] :as q}]
(let [nq (LinkedBlockingQueue.)] ;; we need a new queue here in order to use q's partitioning
(spawn-consumers q (fn [d] (.put nq d)))
(spawn-supervisor q (constantly true))
((fn s []
(lazy-seq (loop [d nil]
(when @run
(if d
(cons d (s))
(recur (.poll nq time-out TimeUnit/MILLISECONDS))))))))))
(defn chain-queues
"Spawn supervisors for a chain of queues (non-branching pipeline) so that the function f is called
when the final queue in the chain has been fully consumed"
[f & qs]
(doseq [[q1 q2] (partition 2 1 qs)]
(spawn-supervisor q1 #(produce-done q2)))
(spawn-supervisor (last qs) f))
(defn spawn-logger [& qs]
"Spawn a watcher thread of supplied queues. Will stop when all qs are shut down or
the returned shutdown function is called."
(let [running (atom true)
log-fn (fn []
(while (and @running (some #(-> % :run deref) qs))
(do
(Thread/sleep 1000)
(log/info
(apply str
"------------------------------\n"
(for [{:keys [name run q producers-done consumers-done]} qs]
(format "%-15.15s [r]%-5.5b [q]%-4d [p#]%-2d [c#]%-2d\n"
name @run
(.size q) (.getCount producers-done)
(.getCount consumers-done))))))))]
(.start (Thread. log-fn))
(fn [] (reset! running false))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment