drop-in-replacement for clojure.core/atom which records count of updates and attempts to update
(ns com-widdindustries-statsatom | |
"A drop-in replacement for clojure.core/atom | |
The difference is that this record the following stats, held in the atom's metadata: | |
- number of times updated via swap!/swap-vals! and | |
- number of attempts made to do those updates | |
Clearly these stats themselves need to be atomically updated so there's some overhead added there. | |
" | |
(:refer-clojure :exclude [atom]) | |
(:import (clojure.lang IAtom2 ARef) | |
(java.util.concurrent.atomic AtomicReference AtomicLong))) | |
(defn- validate [this new-v] | |
(when-let [validator (.getValidator this)] | |
(when-not (boolean (validator new-v)) | |
; todo - this is not exactly like ARef#validate | |
(throw (IllegalStateException. "Invalid reference state"))))) | |
(defn- bod [^ARef this ^AtomicReference state f] | |
(let [{::keys [^AtomicLong attempts ^AtomicLong updates]} (.meta this)] | |
(loop [] | |
(.incrementAndGet attempts) | |
(let [v (.deref this) | |
new-v (f v)] | |
(validate this new-v) | |
(if (.compareAndSet state v new-v) | |
(do | |
(.incrementAndGet updates) | |
(.notifyWatches this v new-v) | |
[v new-v]) | |
(recur)))))) | |
(defn- just-new-bod [this state f] | |
(let [[_ new-v] (bod this state f)] new-v)) | |
(defn- stats-atom [^AtomicReference state] | |
(proxy [ARef IAtom2] [] | |
(swap | |
([f] (just-new-bod this state f)) | |
([f x] (just-new-bod this state #(f % x))) | |
([f x y] (just-new-bod this state #(f % x y))) | |
([f x y more] | |
(just-new-bod this state #(apply f % x y more)))) | |
(swapVals | |
([f] (bod this state f)) | |
([f x] (bod this state #(f % x))) | |
([f x y] (bod this state #(f % x y))) | |
([f x y more] (bod this state #(apply f % x y more)))) | |
(deref [] (.get state)) | |
(compareAndSet [v new-v] | |
(validate this new-v) | |
(if (.compareAndSet state v new-v) | |
(do (.notifyWatches this v new-v) | |
true) | |
false)) | |
(reset [new-v] | |
(let [v (.deref this)] | |
(validate this new-v) | |
(.set state new-v) | |
(.notifyWatches this v new-v) | |
new-v)) | |
(resetVals [new-v] | |
(let [v (.deref this)] | |
(validate this new-v) | |
(.set state new-v) | |
(.notifyWatches this v new-v) | |
[v new-v])))) | |
(defn- add-stats [a] | |
(doto a | |
(.alterMeta (fn [m] | |
(assoc m | |
::attempts (AtomicLong.) | |
::updates (AtomicLong.))) '()))) | |
(defn atom | |
"drop-in replacement for clojure.core/atom" | |
([x] (-> (stats-atom (AtomicReference. x)) | |
add-stats)) | |
([x & options] (-> (#'clojure.core/setup-reference (atom x) options) | |
add-stats))) | |
(comment | |
(def a (atom 1)) | |
(def a (atom 0 :validator (fn [x] (#{0 1} x)) :meta {:foo 1})) | |
(add-watch a :foo (fn [& _] (println "hello"))) | |
(reset-vals! a 3) | |
(reset! a 3) | |
(swap! a inc) | |
(swap! a + 2 3 4) | |
(meta a) | |
(def p (java.util.concurrent.Executors/newCachedThreadPool)) | |
(dotimes [n 10] | |
(.submit p (fn [] | |
(swap! a | |
(fn [v] | |
(Thread/sleep (rand-int (* n 1000))) | |
n))))) | |
; eval (meta a) repeatedly to track progress | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
depend on this from deps e.g.