Created
October 3, 2014 17:28
-
-
Save ianoc/416027db838f6aa98f18 to your computer and use it in GitHub Desktop.
Scalding Riemann
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
GUI settings used: | |
{ | |
"server": "127.0.0.1:5556", | |
"server_type": "ws", | |
"workspaces": [ | |
{ | |
"name": "Riemann", | |
"view": { | |
"type": "Balloon", | |
"weight": 1, | |
"id": "453c0de5a5decb0e73f9cd7e92c0cb9f4577e51f", | |
"version": 15, | |
"child": { | |
"type": "VStack", | |
"weight": 1, | |
"id": "24979d4fbc71c7616f4673f5085084ae7b39e60d", | |
"version": 15, | |
"children": [ | |
{ | |
"type": "Grid", | |
"weight": 1, | |
"id": "82441354a028d76ce8e7e94f22a3b66b8965d4b4", | |
"version": 13, | |
"title": "Riemann", | |
"query": "service =~ \"scaldingprofiler fn %\" and not service =~ \"% epollWait\" and not service =~ \"% doPoll0\" and not service =~ \"% waitForProcessExit\"", | |
"max": "", | |
"rows": "service", | |
"cols": "rate", | |
"row_sort": "metric", | |
"col_sort": "lexical" | |
} | |
] | |
} | |
}, | |
"id": "7a7c436c3a0a978374a4c9f58b13e0ea5e348194" | |
} | |
] | |
} | |
*/ | |
/* | |
Rieman config | |
; -*- mode: clojure; -*- | |
; vim: filetype=clojure | |
(require '[clojure.string :as str]) | |
(logging/init {:file "riemann.log"}) | |
; Listen on the local interface over TCP (5555), UDP (5555), and websockets | |
; (5556) | |
(let [host "0.0.0.0"] | |
(tcp-server {:host host}) | |
(udp-server {:host host}) | |
(ws-server {:host host})) | |
; Expire old events from the index every 5 seconds. | |
(periodically-expire 380) | |
(defn profiler [index] | |
(where (not (expired? event)) | |
(splitp re-matches service | |
; Aggregate rate of samples taken | |
#".*profiler rate" (coalesce | |
; Total sample rate | |
(smap folds/sum | |
(with :host nil | |
index)) | |
; Distinct number of hosts | |
(smap folds/count | |
(adjust [:service str/replace | |
"rate" "hosts"] | |
(with {:host nil :ttl 240} | |
index)))) | |
; Flatten function times across hosts, updating every 60s. | |
#".*profiler fn .+" | |
(pipe - (by :service | |
(coalesce 30 | |
(smap folds/sum | |
(with {:host nil :ttl 240} -)))) | |
; And index the top 10. | |
(top 20 :metric | |
index | |
(with :state "expired" index)))))) | |
; I usually have a top-level splitp to route events to various subsystems. | |
(let [index (index)] | |
(streams | |
(splitp re-find service | |
; Route profiler events to the profiler | |
#"^scaldingprofiler " (profiler index) | |
; Index anything else | |
index))) | |
*/ | |
def fireUpRiemann() { | |
import clojure.java.api.Clojure | |
import clojure.lang.IFn | |
// Clojure var | |
def v(ns: String, variable: String): IFn = | |
Clojure.`var`(ns, variable) | |
// Clojure.read | |
def r(x: String): Any = | |
Clojure.read(x) | |
v("clojure.core", "require").invoke(r("riemann.jvm-profiler")) | |
v("riemann.jvm-profiler", "start-global!").invoke(r(""" | |
{:host "riemannHost" | |
:prefix "scalding" | |
:load 0.9 | |
:dt 1} | |
""")) | |
} | |
@transient lazy val riemann = fireUpRiemann | |
val tp = MySource.map{ t => | |
fireUpRiemann | |
t} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment