Created
July 3, 2012 18:47
-
-
Save martintrojer/3041849 to your computer and use it in GitHub Desktop.
Event Logging
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns logevt-cl | |
(:refer-clojure :exclude [==]) | |
(:use [clojure.core.logic]) | |
(:import [java.util UUID])) | |
(defrel id-name ^:index id ^:index name) | |
(defrel id-time ^:index id ^:index time) | |
(defrel id-op ^:index id ^:index op) | |
(defrel id-cat ^:index id ^:index cat) | |
(defrel id-parent-op ^:index id ^:index parent-op) | |
(def log-worker (agent 0)) | |
(defn emit-timed | |
"Emits data to the log, timestamp appended" | |
[data] | |
(let [id (UUID/randomUUID) | |
name (or (:name data) (str id)) | |
time (System/nanoTime)] | |
;; Background thread to minimize impact on caller | |
;; There might be congestion in (fact) in multithreaded scenarios | |
(send log-worker | |
(fn [ctr] | |
(fact id-name id name) | |
(fact id-time id time) | |
(when-let [op (:op data)] (fact id-op id op)) | |
(when-let [cat (:cat data)] (fact id-cat id cat)) | |
(when-let [po (:parent-op data)] (fact id-parent-op id po)) | |
(inc ctr))))) | |
(defn all-deltaso | |
"goal that unifies op/child-op and time/child-time" | |
[name op child-op op-time child-op-time] | |
(fresh [id1 id2] | |
(id-name id1 name) | |
(id-name id2 name) | |
(id-op id1 op) | |
(id-op id2 child-op) | |
(id-time id1 op-time) | |
(id-time id2 child-op-time) | |
(id-parent-op id2 op))) | |
(defn- merge-fn [a b] (if (vector? a) (conj a b) (conj [a] b))) | |
(defn get-all-deltas-with-name | |
"Return all deltas for all names" | |
[] | |
(->> | |
(run* [q] | |
(fresh [name op child-op t child-t] | |
(all-deltaso name op child-op t child-t) | |
(project [name op child-op t child-t] | |
(== q {[name op child-op] (- child-t t)})))) | |
(apply merge-with merge-fn))) | |
(defn get-all-deltas | |
"Return all deltas over all names" | |
[] | |
(->> (get-all-deltas-with-name) | |
(map (fn [[[_ op child-op] v]] {[op child-op] v})) | |
(apply merge-with merge-fn))) | |
(defn- std-dev [samples] | |
(let [n (count samples) | |
mean (/ (reduce + samples) n) | |
intermediate (map #(Math/pow (- %1 mean) 2) samples)] | |
(Math/sqrt | |
(/ (reduce + intermediate) n)))) | |
(defn gen-latency-report | |
"Generate a latency report for given samples" | |
[factor deltas] | |
(->> deltas | |
(map (fn [[key ds]] | |
(let [ds (if (coll? ds) ds [ds]) | |
ds (map #(/ % factor) ds)] | |
[key {:max (apply max ds) | |
:min (apply min ds) | |
:count (count ds) | |
:average (/ (reduce + ds) (count ds)) | |
:std-deviation (std-dev ds)}]))) | |
(into {}))) | |
;; ------- | |
(comment | |
(defn log-some [] | |
(let [name (gensym) | |
rnd (java.util.Random.)] | |
(emit-timed {:name name :op :add}) | |
(emit-timed {:name name :cat :backend :op :send}) | |
(Thread/sleep (.nextInt rnd 10)) | |
(emit-timed {:name name :cat :backend :op :ack :parent-op :send}) | |
(Thread/sleep (.nextInt rnd 20)) | |
(emit-timed {:name name :cat :backend :op :sub :parent-op :send}) | |
(emit-timed {:name name :cat :distribution :op :contribute}) | |
(emit-timed {:name name :cat :distribution :op :chain-contribute}) | |
(Thread/sleep (.nextInt rnd 50)) | |
(emit-timed {:name name :cat :distribution :op :chain-event :parent-op :contribute}) | |
(emit-timed {:name name :op :done :parent-op :add}) | |
)) | |
@log-worker | |
(doseq [i (range 10)] | |
(future (log-some))) | |
(->> | |
(get-all-deltas-with-name) | |
(gen-latency-report 1)) | |
(->> | |
(get-all-deltas) | |
(gen-latency-report 1000000.0)) | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
;; Some toying around wih event logging, inspired by Mark McGranaghan clojure/conj talk; | |
;; http://blip.tv/clojure/mark-mcgranaghan-logs-as-data-5953857 | |
(defonce ^:private log (agent {} :error-mode :continue)) | |
(defn emit | |
"Emit data to the log" | |
[data] | |
(send log (fn [m] | |
(update-in m [(:id data)] | |
#(conj % data))))) | |
(defn emit-timed | |
"Emit datas to the log, timestamp appended" | |
[data] | |
(emit (assoc data :time (System/nanoTime)))) | |
(defn get-ids | |
"Return a set of ids seen in the log" | |
[log] | |
(keys log)) | |
(defn get-log | |
"Get chronological logs for a given id" | |
[log id] | |
(->> id log (sort-by :time))) | |
(defn- get-single-delta | |
"Get the (minimum) time delta given a category and an event (with a parent-op)" | |
[log evt] | |
(->> log | |
(filter #(and (= (:cat evt) (:cat %)) | |
(= (:parent-op evt) (:op %)))) | |
(map #(- (:time evt) (:time %))) | |
(filter pos?) | |
(apply min))) | |
(def ^:private merge-fn #(if (vector? %1) (conj %1 %2) (conj [%1] %2))) | |
(defn get-deltas | |
"Get all deltas for a given id" | |
[log id] | |
(let [log (get-log log id)] | |
(->> log | |
(filter :parent-op) | |
(map (fn [evt] {[(:cat evt) (:op evt)] | |
(get-single-delta log evt)})) | |
(apply merge-with merge-fn)))) | |
(defn get-all-deltas | |
"Accumulte deltas for all ids in a given log" | |
[log] | |
(->> log | |
get-ids | |
(map (partial get-deltas log)) | |
(apply merge-with merge-fn))) | |
(defn get-all-deltas-per-id | |
"Accumulate deltas for all ids keyed by id" | |
[log] | |
(->> log | |
get-ids | |
(map (fn [id] {id (get-deltas log id)})) | |
(apply merge-with merge-fn))) | |
;; --------------------------------------------------- | |
(defn- std-dev [samples] | |
(let [n (count samples) | |
mean (/ (reduce + samples) n) | |
intermediate (map #(Math/pow (- %1 mean) 2) samples)] | |
(Math/sqrt | |
(/ (reduce + intermediate) n)))) | |
(defn- median [samples] | |
(let [c (count samples) | |
m (int (/ c 2))] | |
(cond | |
(== c 0) 0 | |
(== c 1) (first samples) | |
(odd? c) (nth samples m) | |
:else (let [h (nth samples m) | |
l (nth samples (dec m))] | |
(/ (+ h l) 2))))) | |
(defn- round [n] | |
(let [n (* n 100.0) | |
n (Math/round n)] | |
(/ n 100.0))) | |
(defn- timings [factor samples] | |
(let [s (if (coll? samples) samples [samples]) | |
s (map #(/ % factor) s)] | |
{:max (apply max s) | |
:min (apply min s) | |
:count (count s) | |
:total (round (reduce + s)) | |
:average (round (/ (reduce + s) (count s))) | |
:med (round (median s)) | |
:std-deviation (std-dev s) | |
})) | |
(defn gen-latency-report | |
"Generate a latency report for the time deltas for a given log" | |
[log factor] | |
(->> log | |
get-all-deltas | |
(map (fn [[k ds]] [k (timings factor ds)])) | |
(into {}))) | |
; --------------------------------------------------- | |
(defmacro log-time [ns name f] | |
`(do | |
(emit-timed {:id (str ~ns) :cat :fn :op (str ~name "-start")}) | |
(let [r# ~f] | |
(emit-timed {:id (str ~ns) :cat :fn :op (str ~name "-end") | |
:parent-op (str ~name "-start")}) | |
r#))) | |
;; ============================================================== | |
;; Test | |
(comment | |
(defn log-some [] | |
(let [id (java.util.UUID/randomUUID) | |
rnd (java.util.Random.)] | |
(emit-timed {:id id :op :add}) | |
(emit-timed {:id id :cat :backend :op :send}) | |
(Thread/sleep (.nextInt rnd 10)) | |
(emit-timed {:id id :cat :backend :op :ack :parent-op :send}) | |
(Thread/sleep (.nextInt rnd 20)) | |
(emit-timed {:id id :cat :backend :op :sub :parent-op :send}) | |
(emit-timed {:id id :cat :distribution :op :contribute}) | |
(emit-timed {:id id :cat :distribution :op :chain-contribute}) | |
(Thread/sleep (.nextInt rnd 50)) | |
(emit-timed {:id id :cat :distribution :op :chain-event :parent-op :contribute}) | |
(emit-timed {:id id :op :done :parent-op :add}) | |
)) | |
(log-time *ns* "inc" (inc 1)) | |
(log-time *ns* "dec" (dec 1)) | |
(doseq [i (range 1000)] | |
(future (log-some))) | |
(count @log) | |
(->> @log keys first (get-deltas @log)) | |
(time | |
(gen-latency-report @log 1000000.0)) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment