Skip to content

Instantly share code, notes, and snippets.

@drbobbeaty
Created September 8, 2014 14:07
Show Gist options
  • Save drbobbeaty/f2eca2d254971f5b1257 to your computer and use it in GitHub Desktop.
Save drbobbeaty/f2eca2d254971f5b1257 to your computer and use it in GitHub Desktop.
Clojure code to get the bolt metrics for a Storm Topology
(ns gym.storm.nimbus
"Namespace for exercising Nimbus to find out facts about the specific storm
cluster. This data can be used to monitor the cluster and look at stats in
a way that's got very low load on the overall system."
(:require [backtype.storm.clojure :refer :all]
[backtype.storm.config :refer :all]
[backtype.storm.ui.core :refer :all]
[clj-endpoints :as ep]
[clj-endpoints.persistence.redis :refer [wcar]]
[clj-endpoints.util :as util]
[clojure.string :as cs]
[clojure.tools.logging :refer [error infof warnf]]
[taoensso.carmine :as car])
(:import [org.apache.thrift7.transport TSocket TFramedTransport]
[org.apache.thrift7.protocol TBinaryProtocol]
[backtype.storm.generated Nimbus$Client SupervisorSummary]))
(defn get-emitted-totals
"Function to take a config map and a topology name, and query Nimbus to
see what the `emitted` totals are for each of the bolts in the topology. This
is a very lightweight way to keep track of what's going on in the topology
without monitoring all the messages coming out of the kafka cluster."
[cluster topo]
(let [cfg (ep/config cluster)
tft (TFramedTransport. (TSocket. (:nimbus cfg) 6627))
nc (Nimbus$Client. (TBinaryProtocol. tft))]
(try
(.open tft)
(let [ci (.getClusterInfo nc)
ts (first (filter #(= topo (.get_name %)) (.get_topologies ci)))
ti (.getTopologyInfo nc (.get_id ts))
exes (.get_executors ti)
cnts (into {} (for [[k v] (group-by #(.get_component_id %) exes)]
[(keyword k) (util/safe-sum (map get-emitted v))]))]
(.close tft)
{ :nimbus-host (:nimbus cfg)
:topology topo
:executors (count exes)
:counts cnts })
(catch Exception e (warnf "Exception thrown: %s" (.getMessage e))))))
(defn get-capacity
"Function to take a config map and a topology name, and query Nimbus to
see what the capacity is for each of the bolts in the topology. This
is a very lightweight way to keep track of what's going on in the topology
without monitoring all the messages coming out of the kafka cluster."
[cluster topo]
(let [cfg (ep/config cluster)
tft (TFramedTransport. (TSocket. (:nimbus cfg) 6627))
nc (Nimbus$Client. (TBinaryProtocol. tft))]
(try
(.open tft)
(let [ci (.getClusterInfo nc)
ts (first (filter #(= topo (.get_name %)) (.get_topologies ci)))
tid (.get_id ts)
ti (.getTopologyInfo nc tid)
st (.getTopology nc tid)
exes (.get_executors ti)
bolts (group-by-comp (filter (partial bolt-summary? st) exes))
caps (into {} (for [[id bc] bolts]
[(keyword id) (util/to-4dp (compute-bolt-capacity bc))]))]
(.close tft)
{ :nimbus-host (:nimbus cfg)
:topology topo
:executors (count exes)
:capacity caps })
(catch Exception e (warnf "Exception thrown: %s" (.getMessage e))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment