-
-
Save erenfro/177045fef620e2aaac6f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ExponentialDecayTimer | |
# # | |
# # Implement an exponential backoff timer for reconnecting to metrics | |
# # backends. | |
class ATT_ExponentialDecayTimer | |
attr_accessor :reconnect_time | |
def initialize | |
@reconnect_time = 0 | |
end | |
def get_reconnect_time(max_reconnect_time, connection_attempt_count) | |
if @reconnect_time < max_reconnect_time | |
seconds = @reconnect_time + (2**(connection_attempt_count - 1)) | |
seconds = seconds * (0.5 * (1.0 + rand)) | |
@reconnect_time = if seconds <= max_reconnect_time | |
seconds | |
else | |
max_reconnect_time | |
end | |
end | |
@reconnect_time | |
end | |
end | |
module Sensu | |
module Extension | |
# Setup some basic error handling and connection management. Climb on top | |
# of Sensu logging capability to log error states. | |
class ATT_RelayConnectionHandler < EM::Connection | |
# XXX: These should be runtime configurable. | |
MAX_RECONNECT_ATTEMPTS = 10 | |
MAX_RECONNECT_TIME = 300 # seconds | |
attr_accessor :message_queue, :connection_pool | |
attr_accessor :name, :host, :port, :connected | |
attr_accessor :reconnect_timer | |
# ignore :reek:TooManyStatements | |
def post_init | |
@is_closed = false | |
@connection_attempt_count = 0 | |
@max_reconnect_time = MAX_RECONNECT_TIME | |
@comm_inactivity_timeout = 0 # disable inactivity timeout | |
@pending_connect_timeout = 30 # seconds | |
@reconnect_timer = ATT_ExponentialDecayTimer.new | |
end | |
def connection_completed | |
@connected = true | |
end | |
def close_connection(*args) | |
@is_closed = true | |
@connected = false | |
super(*args) | |
end | |
def com_inactivity_timeout | |
logger.info("[AllTheThings] Connection to #{@name} timed out.") | |
schedule_reconnect | |
end | |
def unbind | |
@connected = false | |
unless @is_closed | |
logger.info('[AllTheThings] Connection closed unintentionally.') | |
schedule_reconnect | |
end | |
end | |
def send_data(*args) | |
super(*args) | |
end | |
# Override EM::Connection.receive_data to prevent it from calling | |
# puts and randomly logging nonsense to sensu-server.log | |
def receive_data(data) | |
end | |
# Reconnect normally attempts to connect at the end of the tick | |
# Delay the reconnect for some seconds. | |
def reconnect(time) | |
EM.add_timer(time) do | |
logger.info("[AllTheThings] Attempting to reconnect relay channel: #{@name}.") | |
super(@host, @port) | |
end | |
end | |
def get_reconnect_time | |
@reconnect_timer.get_reconnect_time( | |
@max_reconnect_time, | |
@connection_attempt_count | |
) | |
end | |
def schedule_reconnect | |
unless @connected | |
@connection_attempt_count += 1 | |
reconnect_time = get_reconnect_time | |
logger.info("[AllTheThings] Scheduling reconnect in #{@reconnect_time} seconds for relay channel #{@name}.") | |
reconnect(reconnect_time) | |
end | |
reconnect_time | |
end | |
def logger | |
Sensu::Logger.get | |
end | |
end # ATT_RelayConnectionHandler | |
# Endpoint | |
# | |
# And endpoint is a backend metric store. This is a compositional object | |
# to help keep the rest of the code sane. | |
class ATT_Endpoint | |
# EM:Connection.send_data batches network connection writes in 16KB | |
# We should start out by having all data in the queue flush in the | |
# space of a single loop tick. | |
MAX_QUEUE_SIZE = 16384 | |
attr_accessor :connection, :queue | |
def initialize(name, host, port, queue_size = MAX_QUEUE_SIZE) | |
@queue = [] | |
@connection = EM.connect(host, port, ATT_RelayConnectionHandler) | |
@connection.name = name | |
@connection.host = host | |
@connection.port = port | |
@connection.message_queue = @queue | |
EventMachine::PeriodicTimer.new(60) do | |
Sensu::Logger.get.info("[AllTheThings] relay queue size for #{name}: #{queue_length}") | |
end | |
end | |
def queue_length | |
@queue | |
.map(&:bytesize) | |
.reduce(:+) || 0 | |
end | |
def flush_to_net | |
sent = @connection.send_data(@queue.join) | |
@queue = [] if sent > 0 | |
end | |
def relay_event(data) | |
if @connection.connected | |
@queue << data | |
if queue_length >= MAX_QUEUE_SIZE | |
flush_to_net | |
Sensu::Logger.get_debug('[AllTheThings] relay.flush_to_net: successfully flushed to network') | |
end | |
end | |
end | |
def stop | |
if @connection.connected | |
flush_to_net | |
@connection.close_connection_after_writing | |
end | |
end | |
end # ATT_Endpoint | |
class AllTheThings < Bridge | |
def initialize | |
super | |
@endpoints = {} | |
@initialized = false | |
@mutators = { | |
graphite: method(:graphite), | |
influxdb: method(:influxdb), | |
opentsdb: method(:opentsdb), | |
elasticsearch: method(:elasticsearch) | |
} | |
end | |
# ignore :reek:LongMethod | |
def post_init | |
@settings[:allthethings].keys.each do |endpoint_name| | |
ep_name = endpoint_name.intern | |
ep_settings = @settings[:allthethings][ep_name] | |
if not ep_settings['enabled'] | |
next | |
end | |
@endpoints[ep_name] = ATT_Endpoint.new( | |
ep_name, | |
ep_settings['host'], | |
ep_settings['port'], | |
ep_settings['max_queue_size'] | |
) | |
#['standard', 'metrics'].each do |ep_type| | |
# if ep_settings.key?(ep_type) | |
# @endpoints[ep_type][ep_name] = ATT_Endpoint.new( | |
# ep_name, | |
# ep_type, | |
# ep_settings['host'], | |
# ep_settings['port'], | |
# ep_settings[ep_type], | |
# ep_settings['max_queue_size'] | |
# ) | |
# end | |
#end | |
#if ep_settings.key?('standard') | |
# @endpoints['standard'][ep_name] = ATT_Endpoint.new( | |
# ep_name, | |
# "standard", | |
# ep_settings['host'], | |
# ep_settings['port'], | |
# ep_settings['standard'], | |
# ep_settings['max_queue_size'] | |
# ) | |
#end | |
#if ep_settings.key?('metrics') | |
# @endpoints['metrics'][ep_name] = ATT_Endpoint.new( | |
# ep_name, | |
# "metrics", | |
# ep_settings['host'], | |
# ep_settings['port'], | |
# ep_settings['metrics'], | |
# ep_settings['max_queue_size'] | |
# ) | |
#end | |
end | |
end | |
def name | |
'all_the_things' | |
end | |
def description | |
'Writes standard checks and metrics to configured endpoints' | |
end | |
def options | |
return @options if @options | |
@options = { | |
graphite: { | |
enabled: false | |
}, | |
opentsdb: { | |
enabled: false | |
}, | |
influxdb: { | |
enabled: false | |
} | |
} | |
if @settings["allthethings"].is_a?(Hash) | |
@options.merge!(@settings["allthethings"]) | |
end | |
@options | |
end | |
def run(event, &block) | |
begin | |
logger.debug("[AllTheThings] all_the_things.run() handling event - #{event}") | |
if event[:check].key?(:type) | |
check_type = event[:check][:type] | |
else | |
check_type = "standard" | |
end | |
settings[:allthethings].keys.each do |endpoint_name| | |
if not settings[:allthethings][endpoint_name][:enabled] | |
next | |
end | |
ep_name = endpoint_name.intern | |
if @subscriptions.include?('bus.' << ep_name.to_s) | |
mutator = @mutators[ep_name] || next | |
mutated = mutate(mutator, ep_name, event) | |
#@endpoints[ep_name].relay_event(mutated) unless mutated.empty? | |
end | |
end | |
end | |
update_allthethings_file = Proc.new do | |
allthethings_json = MultiJson.dump(options) | |
File.open('/tmp/allthethings.json', 'w') do |file| | |
file.write(allthethings_json) | |
end | |
event_json = MultiJson.dump(event) | |
if event[:check].key?(:type) | |
check_type = event[:check][:type] | |
else | |
check_type = "standard" | |
end | |
File.open("/tmp/sensu_#{check_type}.json", 'w') do |file| | |
file.write(event_json) | |
end | |
["wrote config to /tmp/allthethings.json and event data to /tmp/sensu_#{check_type}.json", 0] | |
end | |
EM::defer(update_allthethings_file, block) | |
end | |
def stop | |
#@endpoints.each_key do |eptype| | |
# @endpoints[eptype].each_value do |ep| | |
# ep.stop | |
# end | |
#end | |
@endpoints.each_value do |ep| | |
ep.stop | |
end | |
yield if block_given? | |
end | |
def mutate(mutator, ep_name, event) | |
logger.debug("[AllTheThings] mutating for #{ep_name.inspect}") | |
check = event[:check] | |
output = check[:output] | |
output_type = check[:output_type] | |
endpoint_name = ep_name.to_s | |
if check.key?(:type) | |
check_type = check[:type] | |
else | |
check_type = "standard" | |
end | |
if check_type == "metric" | |
. | |
elsif check_type == "standard" | |
. | |
end | |
end | |
end | |
def graphite(event, check_type) | |
out = '' | |
if check_type == "standard" | |
out << "\"#{@event[:client][:name]}\"." | |
out << "\"#{@event[:check][:name]}\".#{@event[:check][:status]}" | |
out << "#{metric['name']}\t#{metric['value']}\t#{metric['timestamp']}\n" | |
elsif check_type == "metric" | |
out << "#{@event[:client][:name].split(/\./).first}." | |
out << "#{metric['name']}\t#{metric['value']}\t#{metric['timestamp']}\n" | |
end | |
out | |
end | |
def influxdb(event, check_type) | |
out = '' | |
out << "check.#{@event[:client][:name]}" | |
end | |
def logger | |
Sensu::Logger.get | |
end | |
end # AllTheThings < Bridge | |
end # module Extension | |
end # module Sensu |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment