Skip to content

Instantly share code, notes, and snippets.

@ianoc
Created October 3, 2014 17:28
Show Gist options
  • Save ianoc/416027db838f6aa98f18 to your computer and use it in GitHub Desktop.
Save ianoc/416027db838f6aa98f18 to your computer and use it in GitHub Desktop.
Scalding Riemann
/*
GUI settings used:
{
"server": "127.0.0.1:5556",
"server_type": "ws",
"workspaces": [
{
"name": "Riemann",
"view": {
"type": "Balloon",
"weight": 1,
"id": "453c0de5a5decb0e73f9cd7e92c0cb9f4577e51f",
"version": 15,
"child": {
"type": "VStack",
"weight": 1,
"id": "24979d4fbc71c7616f4673f5085084ae7b39e60d",
"version": 15,
"children": [
{
"type": "Grid",
"weight": 1,
"id": "82441354a028d76ce8e7e94f22a3b66b8965d4b4",
"version": 13,
"title": "Riemann",
"query": "service =~ \"scaldingprofiler fn %\" and not service =~ \"% epollWait\" and not service =~ \"% doPoll0\" and not service =~ \"% waitForProcessExit\"",
"max": "",
"rows": "service",
"cols": "rate",
"row_sort": "metric",
"col_sort": "lexical"
}
]
}
},
"id": "7a7c436c3a0a978374a4c9f58b13e0ea5e348194"
}
]
}
*/
/*
Rieman config
; -*- mode: clojure; -*-
; vim: filetype=clojure
(require '[clojure.string :as str])
(logging/init {:file "riemann.log"})
; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
(let [host "0.0.0.0"]
(tcp-server {:host host})
(udp-server {:host host})
(ws-server {:host host}))
; Expire old events from the index every 5 seconds.
(periodically-expire 380)
(defn profiler [index]
(where (not (expired? event))
(splitp re-matches service
; Aggregate rate of samples taken
#".*profiler rate" (coalesce
; Total sample rate
(smap folds/sum
(with :host nil
index))
; Distinct number of hosts
(smap folds/count
(adjust [:service str/replace
"rate" "hosts"]
(with {:host nil :ttl 240}
index))))
; Flatten function times across hosts, updating every 60s.
#".*profiler fn .+"
(pipe - (by :service
(coalesce 30
(smap folds/sum
(with {:host nil :ttl 240} -))))
; And index the top 10.
(top 20 :metric
index
(with :state "expired" index))))))
; I usually have a top-level splitp to route events to various subsystems.
(let [index (index)]
(streams
(splitp re-find service
; Route profiler events to the profiler
#"^scaldingprofiler " (profiler index)
; Index anything else
index)))
*/
def fireUpRiemann() {
import clojure.java.api.Clojure
import clojure.lang.IFn
// Clojure var
def v(ns: String, variable: String): IFn =
Clojure.`var`(ns, variable)
// Clojure.read
def r(x: String): Any =
Clojure.read(x)
v("clojure.core", "require").invoke(r("riemann.jvm-profiler"))
v("riemann.jvm-profiler", "start-global!").invoke(r("""
{:host "riemannHost"
:prefix "scalding"
:load 0.9
:dt 1}
"""))
}
@transient lazy val riemann = fireUpRiemann
val tp = MySource.map{ t =>
fireUpRiemann
t}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment