Skip to content

Instantly share code, notes, and snippets.

@arichiardi
Created March 31, 2022 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arichiardi/fabd517bbf3a19416e0741cec7b36bf9 to your computer and use it in GitHub Desktop.
Save arichiardi/fabd517bbf3a19416e0741cec7b36bf9 to your computer and use it in GitHub Desktop.
Very simple parallel testing utility
(ns test.parallel
(:require
[clojure.core.async :as clj-async :refer [>! go]])
(:import
java.util.concurrent.CountDownLatch
java.util.concurrent.TimeUnit))
(def
^{:dynamic true
:doc
"Parallel task execution timeout.
For waiting all the threads to finish (done CountDownLatch)."}
*latch-timeout*
3)
(def
^{:dynamic true
:doc
"Parallel task execution timeout.
For waiting all the threads to finish (done CountDownLatch)"}
*latch-timeout-unit*
TimeUnit/MINUTES)
(def ^{:dynamic true :doc "Parallel thread maximum"} *max-threads* 10)
(def ^{:dynamic true :doc "Maximum test timeout, in milliseconds."} *max-timeout-ms* 10000)
(def ^{:dynamic true :doc "Whether or not to print the execution log."} *print-execution-log* true)
(defn format-thread-prefix [thread-id] (format "[thd #%s]" thread-id))
(defn done-latch-timeout-ex-info
[]
(ex-info (format "Timed out waiting for all threads to finish (> %s seconds)"
(.toSeconds ^TimeUnit *latch-timeout-unit* *latch-timeout*))
{:latch-timeout *latch-timeout* :latch-timeout-unit *latch-timeout-unit*}))
(defn result-timeout-ex-info
[]
(ex-info (format "Timed out while waiting for threads results (> %s seconds)" *max-timeout-ms*)
{:max-timeout-ms *max-timeout-ms*}))
(defn run-race
"Run functions in parallel, starting all at the same time.
Returns a {:results [] :exceptions []} map.
It also prints out a bunch of logs."
[& fns]
(let [result-chan (clj-async/chan (* 10 *max-threads*)) ;; to be sure
start-latch (CountDownLatch. 1)
done-latch (CountDownLatch. *max-threads*)
results (atom {:results [] :exceptions []})
fns (into [] fns)
n (count fns)]
(dotimes [i *max-threads*]
(go
(try
(.await start-latch)
(>! result-chan {:type :log :thread i :msg "started"})
(let [f (get fns (mod i n))
result (f)]
(>! result-chan {:type :result :thread i :payload result}))
(catch InterruptedException e (>! result-chan {:type :exception :thread i :exception e}))
(catch Throwable e (>! result-chan {:type :exception :thread i :exception e}))
(finally (>! result-chan {:type :log :thread i :msg "ended"}) (.countDown done-latch)))))
(println "Start thread execution")
(.countDown start-latch)
(println (format "Waiting for %s threads to be done" (.getCount done-latch)))
(if (.await done-latch *latch-timeout* *latch-timeout-unit*)
(do (println "All threads done") (clj-async/close! result-chan))
(throw (done-latch-timeout-ex-info)))
(loop []
(if-let [event (clj-async/<!! result-chan)]
(do
(condp = (:type event)
:log (println (format-thread-prefix (:thread event)) (:msg event))
:exception (do (print (format-thread-prefix (:thread event)) " ")
(print-exception (:exception event))
(print "\n")
(swap! results update :exceptions conj (:exception event)))
:result (do (println (format-thread-prefix (:thread event)) (:payload event))
(swap! results update :results conj (:payload event))))
(recur))
(println "Result channel closed")))
@results))
(comment
(binding [*max-threads* 50]
(let [state (atom 0)
{:keys [results exceptions]}
(run-race
#(hash-map :state (swap! state inc))
#(throw (ex-info "Oh noes!" {})))]
(clojure.test/is (= (/ *max-threads* 2)) (count (distinct results)))
(clojure.test/is (= (/ *max-threads* 2) (count exceptions)))))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment