Skip to content

Instantly share code, notes, and snippets.

@Frozenlock
Created January 30, 2021 22:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Frozenlock/e0c8b8f81838182fcf36ae14e7dfea5d to your computer and use it in GitHub Desktop.
Save Frozenlock/e0c8b8f81838182fcf36ae14e7dfea5d to your computer and use it in GitHub Desktop.
(defn -inheritable*
[var]
(let [*i (doto (proxy [InheritableThreadLocal] []
;; This is where the magic happens.
;; childValue is evaluated in the parent thread where bindings should still valid.
(childValue [{:keys [bindings]}]
(let [new-bindings (merge bindings (get-thread-bindings))]
{:bindings new-bindings
:get-val (fn [] (with-bindings new-bindings
(deref var)))})))
(.set {:get-val #(deref var)}))]
(reify clojure.lang.IDeref
(deref
[this]
;; If a binding exists, we are in a Clojure-compatible thread.
(if (contains? (get-thread-bindings) var)
{:value (deref var)
:source :bindings}
;; Otherwise try to use the inheritable variable.
(if-some [get-val (:get-val (.get *i))]
{:value (get-val)
:source :inheritable}
;; Lastly, default to the var initial value.
;; Even if we find ourselves in a 'lost' thread, we can still get the data stored into the atom.
{:value (deref var)
:source :default}))))))
;; Use an atom to easily change 'root' data.
(def ^:dynamic -*data (atom {}))
(defn set-data
[k v]
(swap! -*data assoc k v))
;; ^ use above function to set init values.
;; Ex:
;; (set-data :dsn ...)
;; (set-data :release ...)
(def -*inheritable (-inheritable* #'-*data))
;; Need an indirection layer to configure existing thread pools
(def -thread-local-inheritable
(ThreadLocal.))
(defn -get-inheritable
[]
(if-let [tl-inheritable (.get -thread-local-inheritable)]
tl-inheritable
-*inheritable))
(defn get-data
[]
(deref (:value (deref (-get-inheritable)))))
(defn -init-inheritable-in-clojure-thread-pool
[]
(let [max-count 100
init (->> (partition 20 (pmap (fn [_]
(let [source (:source (deref (-get-inheritable)))]
(if (= source :default)
(do (.set -thread-local-inheritable (-inheritable* #'-*data))
source)
source))) (range)))
(take-while #(some (fn [s] (= s :default)) %))
(take max-count))]
(when (= (count init) 100)
(throw (ex-info "Couldn't initialize inheritable var")))))
;; HACK: force existing threads in Clojure pool to define the inheritable variable.
(-init-inheritable-in-clojure-thread-pool)
;; Alternatively, here's a suggestion from Alex Miller:
;; You can set your own thread pool to use
;; http://clojure.github.io/clojure/clojure.core-api.html#clojure.core/set-agent-send-executor!
;; And the send-off equivalent (which future uses)
;; I would consider these hostile in a library, but ok in an app
;; This macro has a dual purpose:
;; 1. Insert new data into the bindings
;; 2. Use inherited values to place new bindings when coming from a non-clojure thread.
(defmacro with-data
[data & body]
`(let [data# (merge (get-data) ~data)]
(with-bindings {#'-*data
;; Atom not used here to avoid confusion (not resetable)
(reify clojure.lang.IDeref
(deref [_]
data#))}
~@body)))
;;; How it works
(comment
(future ; use default value or bindings
(future ; use default value or bindings
(thread ; use inheritable
(thread ; use inheritable
(with-data {} ; Value from inheritable is put back into bindings
(future)) ; value from bindings
(future))))) ; FALLBACK to initial value
)
;;; Examples
(comment
(with-data {:A 0} (pmap (fn [_] (with-data {:a 1} (get-data))) (range 10)))
(defmacro start-thread
[& body]
`(let [thread# (Thread. ^Runnable (fn [] ~@body))]
(.start thread#)
thread#))
(with-data {:1 :1}
(future
(with-data {:2 :2}
(future
(start-thread
(with-data {:3 :3}
(println (get-data))
(start-thread
(println (get-data))
(start-thread
(println (get-data))))))))))
(with-data {:z :z}
(future
(with-data {:a :a}
(future
(start-thread
(println (get-data))
(start-thread
(println (get-data))
(start-thread
(with-data {}
(future
(println (get-data)))))))))))
(import java.util.concurrent.Executors java.util.concurrent.ExecutorService)
(let [^ExecutorService executor (Executors/newFixedThreadPool 2)
p1 (promise)
p2 (promise)]
(with-data {:a 1}
(.submit executor ^Callable (fn []
(deliver p1 (get-data))
(with-data {:b 2}
(.submit executor ^Callable (fn []
(deliver p2 (get-data)))))))
(let [result [@p1 @p2]]
(.shutdown executor)
result)))
(with-data {:a 1}
(future
(println "Future " (get-data))
(with-data {:b 2}
(start-thread
(println "Thread " (get-data))
(with-data {:c 3}
(start-thread
(println "Thread " (get-data))
(future
(println "Last future " (get-data))))))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment