Skip to content

Instantly share code, notes, and snippets.

@erenfro
Created August 18, 2015 03:55
Show Gist options
  • Save erenfro/177045fef620e2aaac6f to your computer and use it in GitHub Desktop.
Save erenfro/177045fef620e2aaac6f to your computer and use it in GitHub Desktop.
# ExponentialDecayTimer
# #
# # Implement an exponential backoff timer for reconnecting to metrics
# # backends.
class ATT_ExponentialDecayTimer
attr_accessor :reconnect_time
def initialize
@reconnect_time = 0
end
def get_reconnect_time(max_reconnect_time, connection_attempt_count)
if @reconnect_time < max_reconnect_time
seconds = @reconnect_time + (2**(connection_attempt_count - 1))
seconds = seconds * (0.5 * (1.0 + rand))
@reconnect_time = if seconds <= max_reconnect_time
seconds
else
max_reconnect_time
end
end
@reconnect_time
end
end
module Sensu
module Extension
# Setup some basic error handling and connection management. Climb on top
# of Sensu logging capability to log error states.
class ATT_RelayConnectionHandler < EM::Connection
# XXX: These should be runtime configurable.
MAX_RECONNECT_ATTEMPTS = 10
MAX_RECONNECT_TIME = 300 # seconds
attr_accessor :message_queue, :connection_pool
attr_accessor :name, :host, :port, :connected
attr_accessor :reconnect_timer
# ignore :reek:TooManyStatements
def post_init
@is_closed = false
@connection_attempt_count = 0
@max_reconnect_time = MAX_RECONNECT_TIME
@comm_inactivity_timeout = 0 # disable inactivity timeout
@pending_connect_timeout = 30 # seconds
@reconnect_timer = ATT_ExponentialDecayTimer.new
end
def connection_completed
@connected = true
end
def close_connection(*args)
@is_closed = true
@connected = false
super(*args)
end
def com_inactivity_timeout
logger.info("[AllTheThings] Connection to #{@name} timed out.")
schedule_reconnect
end
def unbind
@connected = false
unless @is_closed
logger.info('[AllTheThings] Connection closed unintentionally.')
schedule_reconnect
end
end
def send_data(*args)
super(*args)
end
# Override EM::Connection.receive_data to prevent it from calling
# puts and randomly logging nonsense to sensu-server.log
def receive_data(data)
end
# Reconnect normally attempts to connect at the end of the tick
# Delay the reconnect for some seconds.
def reconnect(time)
EM.add_timer(time) do
logger.info("[AllTheThings] Attempting to reconnect relay channel: #{@name}.")
super(@host, @port)
end
end
def get_reconnect_time
@reconnect_timer.get_reconnect_time(
@max_reconnect_time,
@connection_attempt_count
)
end
def schedule_reconnect
unless @connected
@connection_attempt_count += 1
reconnect_time = get_reconnect_time
logger.info("[AllTheThings] Scheduling reconnect in #{@reconnect_time} seconds for relay channel #{@name}.")
reconnect(reconnect_time)
end
reconnect_time
end
def logger
Sensu::Logger.get
end
end # ATT_RelayConnectionHandler
# Endpoint
#
# And endpoint is a backend metric store. This is a compositional object
# to help keep the rest of the code sane.
class ATT_Endpoint
# EM:Connection.send_data batches network connection writes in 16KB
# We should start out by having all data in the queue flush in the
# space of a single loop tick.
MAX_QUEUE_SIZE = 16384
attr_accessor :connection, :queue
def initialize(name, host, port, queue_size = MAX_QUEUE_SIZE)
@queue = []
@connection = EM.connect(host, port, ATT_RelayConnectionHandler)
@connection.name = name
@connection.host = host
@connection.port = port
@connection.message_queue = @queue
EventMachine::PeriodicTimer.new(60) do
Sensu::Logger.get.info("[AllTheThings] relay queue size for #{name}: #{queue_length}")
end
end
def queue_length
@queue
.map(&:bytesize)
.reduce(:+) || 0
end
def flush_to_net
sent = @connection.send_data(@queue.join)
@queue = [] if sent > 0
end
def relay_event(data)
if @connection.connected
@queue << data
if queue_length >= MAX_QUEUE_SIZE
flush_to_net
Sensu::Logger.get_debug('[AllTheThings] relay.flush_to_net: successfully flushed to network')
end
end
end
def stop
if @connection.connected
flush_to_net
@connection.close_connection_after_writing
end
end
end # ATT_Endpoint
class AllTheThings < Bridge
def initialize
super
@endpoints = {}
@initialized = false
@mutators = {
graphite: method(:graphite),
influxdb: method(:influxdb),
opentsdb: method(:opentsdb),
elasticsearch: method(:elasticsearch)
}
end
# ignore :reek:LongMethod
def post_init
@settings[:allthethings].keys.each do |endpoint_name|
ep_name = endpoint_name.intern
ep_settings = @settings[:allthethings][ep_name]
if not ep_settings['enabled']
next
end
@endpoints[ep_name] = ATT_Endpoint.new(
ep_name,
ep_settings['host'],
ep_settings['port'],
ep_settings['max_queue_size']
)
#['standard', 'metrics'].each do |ep_type|
# if ep_settings.key?(ep_type)
# @endpoints[ep_type][ep_name] = ATT_Endpoint.new(
# ep_name,
# ep_type,
# ep_settings['host'],
# ep_settings['port'],
# ep_settings[ep_type],
# ep_settings['max_queue_size']
# )
# end
#end
#if ep_settings.key?('standard')
# @endpoints['standard'][ep_name] = ATT_Endpoint.new(
# ep_name,
# "standard",
# ep_settings['host'],
# ep_settings['port'],
# ep_settings['standard'],
# ep_settings['max_queue_size']
# )
#end
#if ep_settings.key?('metrics')
# @endpoints['metrics'][ep_name] = ATT_Endpoint.new(
# ep_name,
# "metrics",
# ep_settings['host'],
# ep_settings['port'],
# ep_settings['metrics'],
# ep_settings['max_queue_size']
# )
#end
end
end
def name
'all_the_things'
end
def description
'Writes standard checks and metrics to configured endpoints'
end
def options
return @options if @options
@options = {
graphite: {
enabled: false
},
opentsdb: {
enabled: false
},
influxdb: {
enabled: false
}
}
if @settings["allthethings"].is_a?(Hash)
@options.merge!(@settings["allthethings"])
end
@options
end
def run(event, &block)
begin
logger.debug("[AllTheThings] all_the_things.run() handling event - #{event}")
if event[:check].key?(:type)
check_type = event[:check][:type]
else
check_type = "standard"
end
settings[:allthethings].keys.each do |endpoint_name|
if not settings[:allthethings][endpoint_name][:enabled]
next
end
ep_name = endpoint_name.intern
if @subscriptions.include?('bus.' << ep_name.to_s)
mutator = @mutators[ep_name] || next
mutated = mutate(mutator, ep_name, event)
#@endpoints[ep_name].relay_event(mutated) unless mutated.empty?
end
end
end
update_allthethings_file = Proc.new do
allthethings_json = MultiJson.dump(options)
File.open('/tmp/allthethings.json', 'w') do |file|
file.write(allthethings_json)
end
event_json = MultiJson.dump(event)
if event[:check].key?(:type)
check_type = event[:check][:type]
else
check_type = "standard"
end
File.open("/tmp/sensu_#{check_type}.json", 'w') do |file|
file.write(event_json)
end
["wrote config to /tmp/allthethings.json and event data to /tmp/sensu_#{check_type}.json", 0]
end
EM::defer(update_allthethings_file, block)
end
def stop
#@endpoints.each_key do |eptype|
# @endpoints[eptype].each_value do |ep|
# ep.stop
# end
#end
@endpoints.each_value do |ep|
ep.stop
end
yield if block_given?
end
def mutate(mutator, ep_name, event)
logger.debug("[AllTheThings] mutating for #{ep_name.inspect}")
check = event[:check]
output = check[:output]
output_type = check[:output_type]
endpoint_name = ep_name.to_s
if check.key?(:type)
check_type = check[:type]
else
check_type = "standard"
end
if check_type == "metric"
.
elsif check_type == "standard"
.
end
end
end
def graphite(event, check_type)
out = ''
if check_type == "standard"
out << "\"#{@event[:client][:name]}\"."
out << "\"#{@event[:check][:name]}\".#{@event[:check][:status]}"
out << "#{metric['name']}\t#{metric['value']}\t#{metric['timestamp']}\n"
elsif check_type == "metric"
out << "#{@event[:client][:name].split(/\./).first}."
out << "#{metric['name']}\t#{metric['value']}\t#{metric['timestamp']}\n"
end
out
end
def influxdb(event, check_type)
out = ''
out << "check.#{@event[:client][:name]}"
end
def logger
Sensu::Logger.get
end
end # AllTheThings < Bridge
end # module Extension
end # module Sensu
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment