Skip to content

Instantly share code, notes, and snippets.

@andrewvc
Last active July 18, 2016 19:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrewvc/afdb4100b4a63e8c524c33b1c3956fce to your computer and use it in GitHub Desktop.
Save andrewvc/afdb4100b4a63e8c524c33b1c3956fce to your computer and use it in GitHub Desktop.
./logstash_benchmarker.rb --warmup 60 --duration 120 -e "input { generator {} } output { elasticsearch {} }"
#!/usr/bin/env ruby
require 'clamp'
require 'pry'
require 'elasticsearch'
require 'thread'
Thread.abort_on_exception = true
class MetricReporterBase
attr_reader :recorder
def report(recorder); end
end
class ConsoleMetricReporter < MetricReporterBase
def report(recorder)
puts "=" * 80
puts "#{recorder.name}"
puts "-" * 80
recorder.recordings.each do |attribute,values|
recorder.stats(values).each do |k,v|
puts "#{k}: #{v}"
end
end
end
end
class MetricRecorder
attr_reader :name, :recordings
def initialize(name)
@name = name
@recordings = Hash.new {|h,k| h[k] = []}
end
def record(attribute, value)
@recordings[attribute] << [Time.now, value]
end
# Stats for [[time, number]]
def stats(tuples)
length = tuples.size
values = tuples.map {|v| v[1]}
duration = values.last - values.first
sum = values.reduce(&:+)
average = sum / length.to_f
median = values.sort[((length / 2) - 1).abs]
ninety_fifth_pct =
{
:samples => length,
:duration_in_seconds => duration,
:sum => sum,
:average => average,
:median => median,
:min => values.min,
:max => values.max,
:ninety_fifth_percentile => percentile(95, values),
:ninety_ninth_percentile => percentile(99, values)
}
end
def percentile(percentile, values)
index = ((values.size / 100.0)*percentile).to_i-1
return nil if index < 0
values.sort[index]
end
end
class Instrument
attr_reader :recorder
def initialize(runner_options)
@runner_options = runner_options
@recorder = MetricRecorder.new(self.name)
end
def name
self.class.name
end
def start; end
def record(attribute, value)
#puts "Record #{self.class.name} #{attribute}, #{value}"
@recorder.record(attribute, value)
end
def stop; end
def report
@recorder.report
end
end
class Timer
def self.task(interval_seconds=1)
instance = self.new
instance.every(interval_seconds) do
yield
end
instance
end
def initialize
@running = true
end
def every(interval_seconds=1)
raise ArgumentError, "Only one task allowed!" if @periodic_thread
last_ran = nil
@periodic_thread = Thread.new do
while @running
if !last_ran || (Time.now - last_ran) >= interval_seconds
begin
yield
rescue => e
$stderr.write "Error in timer task #{e.message} #{e.class} #{e.backtrace}"
end
last_ran = Time.now
end
sleep 0.25
end
end
end
def stop
@running = false
@periodic_thread.join
end
end
#Logstash 5.x only. Work in progress
class ThroughputInstrument < Instrument
def initialize(runner_opts)
super
end
def start
@offset = get_event_count()
@timer = Timer.task(1) do
record(get_event_count - @offset)
end
end
def end
@timer.stop
end
end
# Measures throughput by counting indexed documents
class EsThroughputInstrument < Instrument
def initialize(runner_opts)
es_url = runner_opts[:es]
@es = Elasticsearch::Client.new(:host => es_url)
@target_indices = "logstash-*"
super
end
def start(pid)
@es.indices.delete(:index => "logstash-*")
@timer = Timer.task(1) { record_count }
end
def record_count
record(:doc_count, count)
end
def count
@es.count(:index => @target_indices)["count"]
end
def stop
@timer.stop
end
end
class EsConnectionInstrument < Instrument
def start(pid)
@pid = pid
@timer = Timer.task(1) { record_connections }
end
def record_connections
record(:es_open_connections, open_connections)
end
def open_connections
cmd = "lsof -Pni | egrep \"\\s#{@pid}\\s\" | grep 9200 | wc -l"
`#{cmd}`.to_i
end
def stop
@timer.stop
end
end
# All methods are optional
class BenchmarkObserverBase
def before_exec(logstash_path, *args); end
def after_exec(pid, logstash_path, *args); end
def before_warmup(seconds); end
def after_warmup(seconds); end
def before_start(seconds); end
def before_stop(seconds); end
def after_stop(pid, exit_code); end
end
class InstrumentObserver < BenchmarkObserverBase
def initialize(instrument)
@instrument = instrument
end
def after_exec(pid, logstash_path, *args)
@pid = pid
end
def before_start(seconds)
@instrument.start(@pid)
end
def before_stop(seconds)
@instrument.stop
end
end
# Print statuses
class ConsolePrinterObserver < BenchmarkObserverBase
def before_exec(logstash_path, *args)
puts "Will execute: #{logstash_path} with options #{args}"
end
def after_exec(pid, logstash_path, *args)
puts "Started logstash with PID #{pid}"
end
def before_warmup(seconds)
puts "Will warmup for #{seconds} seconds before benchmarking"
end
def after_warmup(seconds)
puts "Warmup ended."
end
def before_start(seconds)
puts "Will measure process for #{seconds} seconds"
end
def before_stop(seconds)
puts "Finished measuring process"
end
def after_stop(pid, exit_code)
puts "Process #{pid} exited with status code #{exit_code}"
end
end
class BenchmarkRun
def initialize(logstash_path, logstash_args, options)
@options = options
@warmup = @options[:warmup]
@duration = @options[:duration]
@logstash_path = logstash_path
@logstash_args = logstash_args
@observers = options[:observers]
end
def execute
@observers.each {|o| o.before_exec(@logstash_path, *@logstash_args) }
@pid = spawn(@logstash_path, *@logstash_args)
@observers.each {|o| o.after_exec(@pid, @logstash_path, *@logstash_args) }
# This needs to be in another thread because Process.wait will block
Thread.new {
begin
@observers.each {|o| o.before_warmup(@warmup) }
sleep @warmup
@observers.each {|o| o.after_warmup(@warmup) }
@observers.each {|o| o.before_start(@duration) }
sleep @duration
@observers.each {|o| o.before_stop(@duration) }
stop_process!
rescue => e
$stderr.write "Error while executing! #{e.message} #{e.class} #{e.backtrace}"
end
}
@exit_code = Process.wait(@pid)
@observers.each {|o| o.after_stop(@pid, @exit_code)}
end
def stop_process!
Process.kill(15, @pid)
i = 0
while process_alive? && i < 10
sleep 0.25
i += 1
end
Process.kill(9, @pid)
end
def process_alive?
Process.kill(0, @pid)
true
rescue Errno::ESRCH => e
false
end
end
BENCHMARK_PROFILES = {
:standard => {
:observers => [ConsolePrinterObserver],
:instruments => [EsThroughputInstrument, EsConnectionInstrument]
}
}
class BenchmarkRunner < Clamp::Command
option ["-e", "--config-string"], "CONFIG", "Config as string"
option ["-f", "--config-file"], "FILENAME", "Config file path"
option ["-w", "--logstash-workers"], "#", "# Logstash workers (-w)"
option ["-l", "--logstash-log-to"], "#", "# Logstash log location)"
option ["--log-level"], "LOG_LEVEL", "Logstash log level"
option "--logstash-path", "PATH", "Path to Logstash binary", :default => "bin/logstash"
option "--es", "URL", "Remote URL for Elasticsearch. Used for measuring throughput", :default => "http://localhost:9200"
option "--run-name", "NAME", "Name for this run", :default => "Run #{Time.now}"
option("--duration", "SECONDS", "Seconds to benchmark", :default => 300) do |s|
Integer(s)
end
option("--warmup", "SECONDS", "Seconds to warmup", :default => 30) do |s|
Integer(s)
end
option "--profile-name", "PROFILE", "Benchmark profile name. One of #{BENCHMARK_PROFILES.keys}", :default => "standard"
def execute
opts = {
:name => run_name,
:warmup => warmup,
:duration => duration,
:es => es
}
profile = BENCHMARK_PROFILES[profile_name.to_sym]
opts[:observers] = profile[:observers].map {|observer_class| observer_class.new}
instruments = profile[:instruments].map {|inst_class| inst_class.new(opts)}
instruments.each do |instrument|
opts[:observers] << InstrumentObserver.new(instrument)
end
bm_run = BenchmarkRun.new(logstash_path, logstash_args, opts)
bm_run.execute
reporter = ConsoleMetricReporter.new
instruments.each {|instrument| reporter.report(instrument.recorder)}
end
def logstash_args
{
:config_string => "-e",
:config_file => "-f",
:logstash_workers => "-w",
:logstash_log_to => "-l",
:log_level => "--log.level"
}.reduce([]) do |acc,kv|
value_method_name, arg_name = kv
value = self.send(value_method_name)
if value
acc << arg_name
acc << value
end
acc
end
end
end
BenchmarkRunner.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment