Skip to content

Instantly share code, notes, and snippets.

Last active April 22, 2024 06:42
Show Gist options
  • Save v-zhuravlev/d593d12eba6e89f71a253d68da7916d6 to your computer and use it in GitHub Desktop.
Save v-zhuravlev/d593d12eba6e89f71a253d68da7916d6 to your computer and use it in GitHub Desktop.
Send metrics to influx(and zabbix_sender) from riemann only if value changed(or when heartbeat interval of 300seconds) or if state changed (for service client-test)
;; InfluxDB database details where Riemann will store the data. Setup :username and :password
;; if you added security step behind riemann database access.
(def influxdb-creds {
:version :0.9
:host "localhost"
:port 8086
:db "riemann"
; :username "riemann"
; :password "riemann"
(def influxBatchSender
(batch 100 1/10
(async-queue! :agg {:queue-size 1000
:core-pool-size 1
:max-pool-size 4
:keep-alive-time 60000}
(influxdb influxdb-creds))))
;; Riemann log file location
(logging/init {:file "/var/log/riemann/riemann.log"})
;;check index
(defn is-metric-changed?
"Is Riemann currently service metric changed?"
(->> (list 'and
(list '= ':host (get event :host))
(list '= ':service (get event :service)))
; Search the current Riemann core's index for any matching events
(riemann.index/search (:index @core))
; Take the first match
; Find its metric
; Is it the value same as before?
(= (get event :metric))))
(defn heartbeat
"Sends metric every 'interval' seconds to 'output-stream' destination"
[interval output-stream]
(pipe -
;double check that will work with same service but different host
(by [:host :service](
;heartbeat 1 packet per 5mins for each service/host
throttle 1 interval
(partial prn "Will send anyway as heartbeat")
;zabbix-var to wrap service name. use 'nowrap' if you do not want to wrap
(def zabbix-var "riemann")
(def to-zabbix-sender-file
(fn[event key-wrap] (info ( str
(get event :host)
"\" \""
(if (= key-wrap "nowrap")
(get event :service)
;else wrap in wrap var
(get event :service)
"\" \""
(get event :metric)
(def test-thresholds
{"client-test" {:warning 2 :critical 3}})
(def test-exact-thresholds
{"client-test" {:exact 1}})
(require '[org.spootnik.riemann.thresholds :refer [threshold-check]])
;; listen on the local interface over TCP (5555), UDP (5555) and websockets (5556)
(let [host ""]
(tcp-server {:host host})
(udp-server {:host host})
(ws-server {:host host}))
;(instrumentation {:enabled? false})
;; Expire states from its core's index every 60 seconds. Default is 10.
(periodically-expire 60)
(let [index (index)]
(default :ttl 60
;if metric is not the same ,send it now
(where (not (is-metric-changed? event))
;(partial prn "Metric has changed")
#(to-zabbix-sender-file % zabbix-var)
;if it is the same: send only once per 300
;(partial prn "Metric is the same")
(heartbeat 300 influxBatchSender)
(heartbeat 300 #(to-zabbix-sender-file % zabbix-var))
(where (service "client-test")
(smap (threshold-check test-thresholds)
(changed :state
#(to-zabbix-sender-file % zabbix-var)
Copy link

v-zhuravlev commented Feb 12, 2017


Copy link

v-zhuravlev commented Feb 13, 2017

another example:
run in cli: tail -f /var/log/riemann/riemann.log | awk -F ' - ' '/\".+\" \".+\" \".+\"/ {print $3}' | zabbix_sender -z --real-time -i - -vv
Where riemann.service should match zabbix zabbix-var["item.key"]

Copy link

v-zhuravlev commented Feb 14, 2017

telegraf sample config used with this:

# Telegraf Configuration
# Telegraf is entirely plugin driven. All metrics are gathered from the
# declared inputs, and sent to the declared outputs.
# Plugins must be declared in here to be active.
# To deactivate a plugin, comment out the name and any variables.
# Use 'telegraf -config telegraf.conf -test' to see what metrics a config
# file would generate.
# Environment variables can be used anywhere in this config file, simply prepend
# them with $. For strings the variable must be within quotes (ie, "$STR_VAR"),
# for numbers and booleans they should be plain (ie, $INT_VAR, $BOOL_VAR)

# Global tags can be specified here in key="value" format.
  # dc = "us-east-1" # will tag all metrics with dc=us-east-1
  # rack = "1a"
  ## Environment variables can be used as tags, and throughout the config file
  # user = "$USER"

# Configuration for telegraf agent
  ## Default data collection interval for all inputs
  interval = "10s"
  ## Rounds collection interval to 'interval'
  ## ie, if interval="10s" then always collect on :00, :10, :20, etc.
  round_interval = true

  ## Telegraf will send metrics to outputs in batches of at most
  ## metric_batch_size metrics.
  ## This controls the size of writes that Telegraf sends to output plugins.
  metric_batch_size = 1000

  ## For failed writes, telegraf will cache metric_buffer_limit metrics for each
  ## output, and will flush this buffer on a successful write. Oldest metrics
  ## are dropped first when this buffer fills.
  ## This buffer only fills when writes fail to output plugin(s).
  metric_buffer_limit = 10000

  ## Collection jitter is used to jitter the collection by a random amount.
  ## Each plugin will sleep for a random time within jitter before collecting.
  ## This can be used to avoid many plugins querying things like sysfs at the
  ## same time, which can have a measurable effect on the system.
  collection_jitter = "0s"

  ## Default flushing interval for all outputs. You shouldn't set this below
  ## interval. Maximum flush_interval will be flush_interval + flush_jitter
  flush_interval = "10s"
  ## Jitter the flush interval by a random amount. This is primarily to avoid
  ## large write spikes for users running a large number of telegraf instances.
  ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
  flush_jitter = "0s"

  ## By default, precision will be set to the same timestamp order as the
  ## collection interval, with the maximum being 1s.
  ## Precision will NOT be used for service inputs, such as logparser and statsd.
  ## Valid values are "ns", "us" (or "µs"), "ms", "s".
  precision = ""

  ## Logging configuration:
  ## Run telegraf with debug log messages.
  debug = false
  ## Run telegraf in quiet mode (error log messages only).
  quiet = false
  ## Specify the log file name. The empty string means to log to stderr.
  logfile = ""

  ## Override default hostname, if empty use os.Hostname()
  hostname = ""
  ## If set to true, do no set the "host" tag in the telegraf agent.
  omit_hostname = false

#                            OUTPUT PLUGINS                                   #

# Configuration for influxdb server to send metrics to
  ## The full HTTP or UDP endpoint URL for your InfluxDB instance.
  ## Multiple urls can be specified as part of the same cluster,
  ## this means that only ONE of the urls will be written to each interval.
  # urls = ["udp://localhost:8089"] # UDP endpoint example
  urls = ["http://localhost:8086"] # required
  ## The target database for metrics (telegraf will create it if not exists).
  database = "telegraf" # required

  ## Retention policy to write to. Empty string writes to the default rp.
  retention_policy = ""
  ## Write consistency (clusters only), can be: "any", "one", "quorum", "all"
  write_consistency = "any"

  ## Write timeout (for the InfluxDB client), formatted as a string.
  ## If not provided, will default to 5s. 0s means no timeout (not recommended).
  timeout = "5s"
  # username = "telegraf"
  # password = "metricsmetricsmetricsmetrics"
  ## Set the user agent for HTTP POSTs (can be useful for log differentiation)
  # user_agent = "telegraf"
  ## Set UDP payload size, defaults to InfluxDB UDP Client default (512 bytes)
  # udp_payload = 512

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  exe = "influxd"
  exe = "telegraf"
  pattern = "java -cp /usr/lib/riemann/riemann.jar"

# # Retrieves SNMP values from remote agents
   agents = [ "","","" ]
    version = 2
  community = "public"
  name = "snmp"

    name = "host"
    oid = "RFC1213-MIB::sysName.0"
    is_tag = true

    name = "snmp"
    inherit_tags = [ "host" ]
    oid = "IF-MIB::ifTable"

      name = "ifName"
      oid = "IF-MIB::ifName"
      is_tag = true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment