Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
### Stub for a logstash filter event
class Event
def initialize(initial_data=nil)
@data = initial_data || {}
end
def get(key)
@data[key]
end
def set(key, val)
@data[key] = val
end
def inspect
@data.inspect
end
end
# simulate an event coming from a logstash input to the filter
event = Event.new
event.set("host", "some_host")
# first document
if rand(100) < 50
value = Time.now.to_i * 5
event.set("metric_path", "Cassandra.Write.total_latency.count")
event.set("metric_value_number", value)
else
# second document
value = Time.now.to_i
event.set("metric_path", "Cassandra.Write.total.count")
event.set("metric_value_number", value)
end
### logstash ruby filter
# ruby {
# init => '
require 'yaml'
# '
# code => '
host = event.get("host")
scope = "write"
data_path = "/tmp/#{host}_jmx_#{scope}_logstash_filter.yml"
lock_path = "/tmp/#{host}_jmx_#{scope}_logstash_filter.lock"
data_file = File.new(data_path, "a+")
lock_file = File.new(lock_path, File::RDWR|File::CREAT)
begin
# acquire lock
# only allow one logstash worker to enter at a time
# per host scope, to avoid race conditions when there
# are multiple workers reading documents simultaneously
lock_file.flock(File::LOCK_EX)
data = YAML.load(data_file.read)
data ||= {"current" => {}, "previous" => {}}
reset_data = false
puts "event: #{event}"
puts "data from file: #{data}"
# set whichever new JMX metric we have in this event
event_type = event.get("metric_path")
event_value = event.get("metric_value_number")
event_total_latency = nil
event_total_request_count = nil
case event_type
when "Cassandra.Write.total_latency.count"
event_total_latency = event_value
when "Cassandra.Write.total.count"
event_total_request_count = event_value
else
raise "unexpected event type: #{event_type}"
end
data["current"]["total_latency"] = event_total_latency if event_total_latency
data["current"]["total_request_count"] = event_total_request_count if event_total_request_count
if data["current"]["ready"] == true
# someone else got here first and wrote the other
# current metric -- we are ready to perform calculation
# and shift current into previous
if data["previous"] && data["previous"].keys.count > 0
request_count = data["current"]["total_request_count"]
previous_request_count = data["previous"]["total_request_count"]
total_latency = data["current"]["total_latency"]
previous_total_latency = data["previous"]["total_latency"]
if request_count && previous_request_count && total_latency && previous_total_latency
request_count = request_count - previous_request_count
if request_count > 0
avg_latency = (total_latency - previous_total_latency) / request_count.to_f
# convert metric unit of usec to msec
avg_latency = avg_latency / 1000.0
# add calculated fields
event.set("avg_latency", avg_latency)
event.set("requests", request_count)
end
else
# if we don't have all expected values
# we're out of sync and need to start over
reset_data = true
tags = event.get("tags") || []
event.set("tags", tags << "_jmx_metric_order_failure")
end
else
# first run, there is no previous data
# to create calculateion; carry on.
puts "no previous data, continuing..."
end
data["previous"] = data["current"]
data["current"] = {}
else
# we are the first one to get here and will populate
# our value for the next event to perform the calculation
data["current"]["ready"] = true
end
if reset_data
data = {"current" => {}, "previous" => {}}
end
puts "data to file: #{data}"
data_file.rewind
data_file.truncate(0)
data_file.write(YAML.dump(data))
data_file.close
ensure
# release lock
lock_file.flock(File::LOCK_UN)
end
# '
# }
### ------------------------------
# print to console what would make it to elastic
puts "Logstash event for elastic:"
puts event.inspect
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.