Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Event Logging
(ns logevt-cl
(:refer-clojure :exclude [==])
(:use [clojure.core.logic])
(:import [java.util UUID]))
(defrel id-name ^:index id ^:index name)
(defrel id-time ^:index id ^:index time)
(defrel id-op ^:index id ^:index op)
(defrel id-cat ^:index id ^:index cat)
(defrel id-parent-op ^:index id ^:index parent-op)
(def log-worker (agent 0))
(defn emit-timed
"Emits data to the log, timestamp appended"
[data]
(let [id (UUID/randomUUID)
name (or (:name data) (str id))
time (System/nanoTime)]
;; Background thread to minimize impact on caller
;; There might be congestion in (fact) in multithreaded scenarios
(send log-worker
(fn [ctr]
(fact id-name id name)
(fact id-time id time)
(when-let [op (:op data)] (fact id-op id op))
(when-let [cat (:cat data)] (fact id-cat id cat))
(when-let [po (:parent-op data)] (fact id-parent-op id po))
(inc ctr)))))
(defn all-deltaso
"goal that unifies op/child-op and time/child-time"
[name op child-op op-time child-op-time]
(fresh [id1 id2]
(id-name id1 name)
(id-name id2 name)
(id-op id1 op)
(id-op id2 child-op)
(id-time id1 op-time)
(id-time id2 child-op-time)
(id-parent-op id2 op)))
(defn- merge-fn [a b] (if (vector? a) (conj a b) (conj [a] b)))
(defn get-all-deltas-with-name
"Return all deltas for all names"
[]
(->>
(run* [q]
(fresh [name op child-op t child-t]
(all-deltaso name op child-op t child-t)
(project [name op child-op t child-t]
(== q {[name op child-op] (- child-t t)}))))
(apply merge-with merge-fn)))
(defn get-all-deltas
"Return all deltas over all names"
[]
(->> (get-all-deltas-with-name)
(map (fn [[[_ op child-op] v]] {[op child-op] v}))
(apply merge-with merge-fn)))
(defn- std-dev [samples]
(let [n (count samples)
mean (/ (reduce + samples) n)
intermediate (map #(Math/pow (- %1 mean) 2) samples)]
(Math/sqrt
(/ (reduce + intermediate) n))))
(defn gen-latency-report
"Generate a latency report for given samples"
[factor deltas]
(->> deltas
(map (fn [[key ds]]
(let [ds (if (coll? ds) ds [ds])
ds (map #(/ % factor) ds)]
[key {:max (apply max ds)
:min (apply min ds)
:count (count ds)
:average (/ (reduce + ds) (count ds))
:std-deviation (std-dev ds)}])))
(into {})))
;; -------
(comment
(defn log-some []
(let [name (gensym)
rnd (java.util.Random.)]
(emit-timed {:name name :op :add})
(emit-timed {:name name :cat :backend :op :send})
(Thread/sleep (.nextInt rnd 10))
(emit-timed {:name name :cat :backend :op :ack :parent-op :send})
(Thread/sleep (.nextInt rnd 20))
(emit-timed {:name name :cat :backend :op :sub :parent-op :send})
(emit-timed {:name name :cat :distribution :op :contribute})
(emit-timed {:name name :cat :distribution :op :chain-contribute})
(Thread/sleep (.nextInt rnd 50))
(emit-timed {:name name :cat :distribution :op :chain-event :parent-op :contribute})
(emit-timed {:name name :op :done :parent-op :add})
))
@log-worker
(doseq [i (range 10)]
(future (log-some)))
(->>
(get-all-deltas-with-name)
(gen-latency-report 1))
(->>
(get-all-deltas)
(gen-latency-report 1000000.0))
)
;; Some toying around wih event logging, inspired by Mark McGranaghan clojure/conj talk;
;; http://blip.tv/clojure/mark-mcgranaghan-logs-as-data-5953857
(defonce ^:private log (agent {} :error-mode :continue))
(defn emit
"Emit data to the log"
[data]
(send log (fn [m]
(update-in m [(:id data)]
#(conj % data)))))
(defn emit-timed
"Emit datas to the log, timestamp appended"
[data]
(emit (assoc data :time (System/nanoTime))))
(defn get-ids
"Return a set of ids seen in the log"
[log]
(keys log))
(defn get-log
"Get chronological logs for a given id"
[log id]
(->> id log (sort-by :time)))
(defn- get-single-delta
"Get the (minimum) time delta given a category and an event (with a parent-op)"
[log evt]
(->> log
(filter #(and (= (:cat evt) (:cat %))
(= (:parent-op evt) (:op %))))
(map #(- (:time evt) (:time %)))
(filter pos?)
(apply min)))
(def ^:private merge-fn #(if (vector? %1) (conj %1 %2) (conj [%1] %2)))
(defn get-deltas
"Get all deltas for a given id"
[log id]
(let [log (get-log log id)]
(->> log
(filter :parent-op)
(map (fn [evt] {[(:cat evt) (:op evt)]
(get-single-delta log evt)}))
(apply merge-with merge-fn))))
(defn get-all-deltas
"Accumulte deltas for all ids in a given log"
[log]
(->> log
get-ids
(map (partial get-deltas log))
(apply merge-with merge-fn)))
(defn get-all-deltas-per-id
"Accumulate deltas for all ids keyed by id"
[log]
(->> log
get-ids
(map (fn [id] {id (get-deltas log id)}))
(apply merge-with merge-fn)))
;; ---------------------------------------------------
(defn- std-dev [samples]
(let [n (count samples)
mean (/ (reduce + samples) n)
intermediate (map #(Math/pow (- %1 mean) 2) samples)]
(Math/sqrt
(/ (reduce + intermediate) n))))
(defn- median [samples]
(let [c (count samples)
m (int (/ c 2))]
(cond
(== c 0) 0
(== c 1) (first samples)
(odd? c) (nth samples m)
:else (let [h (nth samples m)
l (nth samples (dec m))]
(/ (+ h l) 2)))))
(defn- round [n]
(let [n (* n 100.0)
n (Math/round n)]
(/ n 100.0)))
(defn- timings [factor samples]
(let [s (if (coll? samples) samples [samples])
s (map #(/ % factor) s)]
{:max (apply max s)
:min (apply min s)
:count (count s)
:total (round (reduce + s))
:average (round (/ (reduce + s) (count s)))
:med (round (median s))
:std-deviation (std-dev s)
}))
(defn gen-latency-report
"Generate a latency report for the time deltas for a given log"
[log factor]
(->> log
get-all-deltas
(map (fn [[k ds]] [k (timings factor ds)]))
(into {})))
; ---------------------------------------------------
(defmacro log-time [ns name f]
`(do
(emit-timed {:id (str ~ns) :cat :fn :op (str ~name "-start")})
(let [r# ~f]
(emit-timed {:id (str ~ns) :cat :fn :op (str ~name "-end")
:parent-op (str ~name "-start")})
r#)))
;; ==============================================================
;; Test
(comment
(defn log-some []
(let [id (java.util.UUID/randomUUID)
rnd (java.util.Random.)]
(emit-timed {:id id :op :add})
(emit-timed {:id id :cat :backend :op :send})
(Thread/sleep (.nextInt rnd 10))
(emit-timed {:id id :cat :backend :op :ack :parent-op :send})
(Thread/sleep (.nextInt rnd 20))
(emit-timed {:id id :cat :backend :op :sub :parent-op :send})
(emit-timed {:id id :cat :distribution :op :contribute})
(emit-timed {:id id :cat :distribution :op :chain-contribute})
(Thread/sleep (.nextInt rnd 50))
(emit-timed {:id id :cat :distribution :op :chain-event :parent-op :contribute})
(emit-timed {:id id :op :done :parent-op :add})
))
(log-time *ns* "inc" (inc 1))
(log-time *ns* "dec" (dec 1))
(doseq [i (range 1000)]
(future (log-some)))
(count @log)
(->> @log keys first (get-deltas @log))
(time
(gen-latency-report @log 1000000.0))
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment