Last active
July 18, 2016 19:02
-
-
Save andrewvc/afdb4100b4a63e8c524c33b1c3956fce to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./logstash_benchmarker.rb --warmup 60 --duration 120 -e "input { generator {} } output { elasticsearch {} }" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'clamp' | |
require 'pry' | |
require 'elasticsearch' | |
require 'thread' | |
Thread.abort_on_exception = true | |
class MetricReporterBase | |
attr_reader :recorder | |
def report(recorder); end | |
end | |
class ConsoleMetricReporter < MetricReporterBase | |
def report(recorder) | |
puts "=" * 80 | |
puts "#{recorder.name}" | |
puts "-" * 80 | |
recorder.recordings.each do |attribute,values| | |
recorder.stats(values).each do |k,v| | |
puts "#{k}: #{v}" | |
end | |
end | |
end | |
end | |
class MetricRecorder | |
attr_reader :name, :recordings | |
def initialize(name) | |
@name = name | |
@recordings = Hash.new {|h,k| h[k] = []} | |
end | |
def record(attribute, value) | |
@recordings[attribute] << [Time.now, value] | |
end | |
# Stats for [[time, number]] | |
def stats(tuples) | |
length = tuples.size | |
values = tuples.map {|v| v[1]} | |
duration = values.last - values.first | |
sum = values.reduce(&:+) | |
average = sum / length.to_f | |
median = values.sort[((length / 2) - 1).abs] | |
ninety_fifth_pct = | |
{ | |
:samples => length, | |
:duration_in_seconds => duration, | |
:sum => sum, | |
:average => average, | |
:median => median, | |
:min => values.min, | |
:max => values.max, | |
:ninety_fifth_percentile => percentile(95, values), | |
:ninety_ninth_percentile => percentile(99, values) | |
} | |
end | |
def percentile(percentile, values) | |
index = ((values.size / 100.0)*percentile).to_i-1 | |
return nil if index < 0 | |
values.sort[index] | |
end | |
end | |
class Instrument | |
attr_reader :recorder | |
def initialize(runner_options) | |
@runner_options = runner_options | |
@recorder = MetricRecorder.new(self.name) | |
end | |
def name | |
self.class.name | |
end | |
def start; end | |
def record(attribute, value) | |
#puts "Record #{self.class.name} #{attribute}, #{value}" | |
@recorder.record(attribute, value) | |
end | |
def stop; end | |
def report | |
@recorder.report | |
end | |
end | |
class Timer | |
def self.task(interval_seconds=1) | |
instance = self.new | |
instance.every(interval_seconds) do | |
yield | |
end | |
instance | |
end | |
def initialize | |
@running = true | |
end | |
def every(interval_seconds=1) | |
raise ArgumentError, "Only one task allowed!" if @periodic_thread | |
last_ran = nil | |
@periodic_thread = Thread.new do | |
while @running | |
if !last_ran || (Time.now - last_ran) >= interval_seconds | |
begin | |
yield | |
rescue => e | |
$stderr.write "Error in timer task #{e.message} #{e.class} #{e.backtrace}" | |
end | |
last_ran = Time.now | |
end | |
sleep 0.25 | |
end | |
end | |
end | |
def stop | |
@running = false | |
@periodic_thread.join | |
end | |
end | |
#Logstash 5.x only. Work in progress | |
class ThroughputInstrument < Instrument | |
def initialize(runner_opts) | |
super | |
end | |
def start | |
@offset = get_event_count() | |
@timer = Timer.task(1) do | |
record(get_event_count - @offset) | |
end | |
end | |
def end | |
@timer.stop | |
end | |
end | |
# Measures throughput by counting indexed documents | |
class EsThroughputInstrument < Instrument | |
def initialize(runner_opts) | |
es_url = runner_opts[:es] | |
@es = Elasticsearch::Client.new(:host => es_url) | |
@target_indices = "logstash-*" | |
super | |
end | |
def start(pid) | |
@es.indices.delete(:index => "logstash-*") | |
@timer = Timer.task(1) { record_count } | |
end | |
def record_count | |
record(:doc_count, count) | |
end | |
def count | |
@es.count(:index => @target_indices)["count"] | |
end | |
def stop | |
@timer.stop | |
end | |
end | |
class EsConnectionInstrument < Instrument | |
def start(pid) | |
@pid = pid | |
@timer = Timer.task(1) { record_connections } | |
end | |
def record_connections | |
record(:es_open_connections, open_connections) | |
end | |
def open_connections | |
cmd = "lsof -Pni | egrep \"\\s#{@pid}\\s\" | grep 9200 | wc -l" | |
`#{cmd}`.to_i | |
end | |
def stop | |
@timer.stop | |
end | |
end | |
# All methods are optional | |
class BenchmarkObserverBase | |
def before_exec(logstash_path, *args); end | |
def after_exec(pid, logstash_path, *args); end | |
def before_warmup(seconds); end | |
def after_warmup(seconds); end | |
def before_start(seconds); end | |
def before_stop(seconds); end | |
def after_stop(pid, exit_code); end | |
end | |
class InstrumentObserver < BenchmarkObserverBase | |
def initialize(instrument) | |
@instrument = instrument | |
end | |
def after_exec(pid, logstash_path, *args) | |
@pid = pid | |
end | |
def before_start(seconds) | |
@instrument.start(@pid) | |
end | |
def before_stop(seconds) | |
@instrument.stop | |
end | |
end | |
# Print statuses | |
class ConsolePrinterObserver < BenchmarkObserverBase | |
def before_exec(logstash_path, *args) | |
puts "Will execute: #{logstash_path} with options #{args}" | |
end | |
def after_exec(pid, logstash_path, *args) | |
puts "Started logstash with PID #{pid}" | |
end | |
def before_warmup(seconds) | |
puts "Will warmup for #{seconds} seconds before benchmarking" | |
end | |
def after_warmup(seconds) | |
puts "Warmup ended." | |
end | |
def before_start(seconds) | |
puts "Will measure process for #{seconds} seconds" | |
end | |
def before_stop(seconds) | |
puts "Finished measuring process" | |
end | |
def after_stop(pid, exit_code) | |
puts "Process #{pid} exited with status code #{exit_code}" | |
end | |
end | |
class BenchmarkRun | |
def initialize(logstash_path, logstash_args, options) | |
@options = options | |
@warmup = @options[:warmup] | |
@duration = @options[:duration] | |
@logstash_path = logstash_path | |
@logstash_args = logstash_args | |
@observers = options[:observers] | |
end | |
def execute | |
@observers.each {|o| o.before_exec(@logstash_path, *@logstash_args) } | |
@pid = spawn(@logstash_path, *@logstash_args) | |
@observers.each {|o| o.after_exec(@pid, @logstash_path, *@logstash_args) } | |
# This needs to be in another thread because Process.wait will block | |
Thread.new { | |
begin | |
@observers.each {|o| o.before_warmup(@warmup) } | |
sleep @warmup | |
@observers.each {|o| o.after_warmup(@warmup) } | |
@observers.each {|o| o.before_start(@duration) } | |
sleep @duration | |
@observers.each {|o| o.before_stop(@duration) } | |
stop_process! | |
rescue => e | |
$stderr.write "Error while executing! #{e.message} #{e.class} #{e.backtrace}" | |
end | |
} | |
@exit_code = Process.wait(@pid) | |
@observers.each {|o| o.after_stop(@pid, @exit_code)} | |
end | |
def stop_process! | |
Process.kill(15, @pid) | |
i = 0 | |
while process_alive? && i < 10 | |
sleep 0.25 | |
i += 1 | |
end | |
Process.kill(9, @pid) | |
end | |
def process_alive? | |
Process.kill(0, @pid) | |
true | |
rescue Errno::ESRCH => e | |
false | |
end | |
end | |
BENCHMARK_PROFILES = { | |
:standard => { | |
:observers => [ConsolePrinterObserver], | |
:instruments => [EsThroughputInstrument, EsConnectionInstrument] | |
} | |
} | |
class BenchmarkRunner < Clamp::Command | |
option ["-e", "--config-string"], "CONFIG", "Config as string" | |
option ["-f", "--config-file"], "FILENAME", "Config file path" | |
option ["-w", "--logstash-workers"], "#", "# Logstash workers (-w)" | |
option ["-l", "--logstash-log-to"], "#", "# Logstash log location)" | |
option ["--log-level"], "LOG_LEVEL", "Logstash log level" | |
option "--logstash-path", "PATH", "Path to Logstash binary", :default => "bin/logstash" | |
option "--es", "URL", "Remote URL for Elasticsearch. Used for measuring throughput", :default => "http://localhost:9200" | |
option "--run-name", "NAME", "Name for this run", :default => "Run #{Time.now}" | |
option("--duration", "SECONDS", "Seconds to benchmark", :default => 300) do |s| | |
Integer(s) | |
end | |
option("--warmup", "SECONDS", "Seconds to warmup", :default => 30) do |s| | |
Integer(s) | |
end | |
option "--profile-name", "PROFILE", "Benchmark profile name. One of #{BENCHMARK_PROFILES.keys}", :default => "standard" | |
def execute | |
opts = { | |
:name => run_name, | |
:warmup => warmup, | |
:duration => duration, | |
:es => es | |
} | |
profile = BENCHMARK_PROFILES[profile_name.to_sym] | |
opts[:observers] = profile[:observers].map {|observer_class| observer_class.new} | |
instruments = profile[:instruments].map {|inst_class| inst_class.new(opts)} | |
instruments.each do |instrument| | |
opts[:observers] << InstrumentObserver.new(instrument) | |
end | |
bm_run = BenchmarkRun.new(logstash_path, logstash_args, opts) | |
bm_run.execute | |
reporter = ConsoleMetricReporter.new | |
instruments.each {|instrument| reporter.report(instrument.recorder)} | |
end | |
def logstash_args | |
{ | |
:config_string => "-e", | |
:config_file => "-f", | |
:logstash_workers => "-w", | |
:logstash_log_to => "-l", | |
:log_level => "--log.level" | |
}.reduce([]) do |acc,kv| | |
value_method_name, arg_name = kv | |
value = self.send(value_method_name) | |
if value | |
acc << arg_name | |
acc << value | |
end | |
acc | |
end | |
end | |
end | |
BenchmarkRunner.run |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment