Skip to content

Instantly share code, notes, and snippets.

@calston
Created December 12, 2014 20:03
Show Gist options
  • Save calston/0ecf39f4303627605aef to your computer and use it in GitHub Desktop.
Save calston/0ecf39f4303627605aef to your computer and use it in GitHub Desktop.
; -*- mode: clojure; -*-
; vim: filetype=clojure
(require 'capacitor.core)
(require 'capacitor.async)
(require 'clojure.core.async)
(require 'clj-http.client)
(require 'cheshire.core)
(defn ghost [token]
(let [uri (str "http://localhost:8000/api/event/" token)]
(fn [events] (clj-http.client/post uri
{:form-params
{:payload (cheshire.core/generate-string {
:host (:host events),
:state (:state events),
:service (:service events),
:description (:description events),
:metric (:metric events)
})}
}
))
)
)
(def ghoster (ghost "CHANGE_ME"))
(defn make-async-influxdb-client [opts]
(let [client (capacitor.core/make-client opts)
;; Make a channel to buffer influxdb events
events-in (capacitor.async/make-chan)
;; Make a channel to collect influxdb responses (ignored, really)
resp-out (capacitor.async/make-chan)]
;; Start the run loop with a batch size of max 100 events and max 10
;; seconds
(capacitor.async/run! events-in resp-out client 100 10000)
(fn [series payload]
(let [p (merge payload {
:series series
:time (* 1000 (:time payload)) ;; s → ms
})]
(clojure.core.async/put! events-in p)
)
)
)
)
(def influx (make-async-influxdb-client {
:host "localhost"
:port 8086
:username "riemann"
:password "riemann"
:db "riemann"
})
)
(logging/init {:file "/var/log/riemann/riemann.log"})
; Listen on the local interface over TCP (5555), UDP (5555), and websockets
; (5556)
(let [host "0.0.0.0"]
(tcp-server {:host host})
(udp-server {:host host})
(ws-server {:host host}))
; Expire old events from the index every 5 seconds.
(periodically-expire 60)
(let [index (index)]
; Inbound events will be passed to these streams:
(streams
(fn [event]
(let [series (format "%s.%s" (:host event) (:service event))]
(influx series {
:time (:time event)
:value (:metric event)
})
)
)
(where (not (expired? event))
(by [:host :service]
(stable 30 :state (changed :state ghoster))
)
)
index
))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment