Skip to content

Instantly share code, notes, and snippets.

@andrewvc andrewvc/example.sh
Last active Jul 18, 2016

Embed
What would you like to do?
./logstash_benchmarker.rb --warmup 60 --duration 120 -e "input { generator {} } output { elasticsearch {} }"
#!/usr/bin/env ruby
require 'clamp'
require 'pry'
require 'elasticsearch'
require 'thread'
Thread.abort_on_exception = true
class MetricReporterBase
attr_reader :recorder
def report(recorder); end
end
class ConsoleMetricReporter < MetricReporterBase
def report(recorder)
puts "=" * 80
puts "#{recorder.name}"
puts "-" * 80
recorder.recordings.each do |attribute,values|
recorder.stats(values).each do |k,v|
puts "#{k}: #{v}"
end
end
end
end
class MetricRecorder
attr_reader :name, :recordings
def initialize(name)
@name = name
@recordings = Hash.new {|h,k| h[k] = []}
end
def record(attribute, value)
@recordings[attribute] << [Time.now, value]
end
# Stats for [[time, number]]
def stats(tuples)
length = tuples.size
values = tuples.map {|v| v[1]}
duration = values.last - values.first
sum = values.reduce(&:+)
average = sum / length.to_f
median = values.sort[((length / 2) - 1).abs]
ninety_fifth_pct =
{
:samples => length,
:duration_in_seconds => duration,
:sum => sum,
:average => average,
:median => median,
:min => values.min,
:max => values.max,
:ninety_fifth_percentile => percentile(95, values),
:ninety_ninth_percentile => percentile(99, values)
}
end
def percentile(percentile, values)
index = ((values.size / 100.0)*percentile).to_i-1
return nil if index < 0
values.sort[index]
end
end
class Instrument
attr_reader :recorder
def initialize(runner_options)
@runner_options = runner_options
@recorder = MetricRecorder.new(self.name)
end
def name
self.class.name
end
def start; end
def record(attribute, value)
#puts "Record #{self.class.name} #{attribute}, #{value}"
@recorder.record(attribute, value)
end
def stop; end
def report
@recorder.report
end
end
class Timer
def self.task(interval_seconds=1)
instance = self.new
instance.every(interval_seconds) do
yield
end
instance
end
def initialize
@running = true
end
def every(interval_seconds=1)
raise ArgumentError, "Only one task allowed!" if @periodic_thread
last_ran = nil
@periodic_thread = Thread.new do
while @running
if !last_ran || (Time.now - last_ran) >= interval_seconds
begin
yield
rescue => e
$stderr.write "Error in timer task #{e.message} #{e.class} #{e.backtrace}"
end
last_ran = Time.now
end
sleep 0.25
end
end
end
def stop
@running = false
@periodic_thread.join
end
end
#Logstash 5.x only. Work in progress
class ThroughputInstrument < Instrument
def initialize(runner_opts)
super
end
def start
@offset = get_event_count()
@timer = Timer.task(1) do
record(get_event_count - @offset)
end
end
def end
@timer.stop
end
end
# Measures throughput by counting indexed documents
class EsThroughputInstrument < Instrument
def initialize(runner_opts)
es_url = runner_opts[:es]
@es = Elasticsearch::Client.new(:host => es_url)
@target_indices = "logstash-*"
super
end
def start(pid)
@es.indices.delete(:index => "logstash-*")
@timer = Timer.task(1) { record_count }
end
def record_count
record(:doc_count, count)
end
def count
@es.count(:index => @target_indices)["count"]
end
def stop
@timer.stop
end
end
class EsConnectionInstrument < Instrument
def start(pid)
@pid = pid
@timer = Timer.task(1) { record_connections }
end
def record_connections
record(:es_open_connections, open_connections)
end
def open_connections
cmd = "lsof -Pni | egrep \"\\s#{@pid}\\s\" | grep 9200 | wc -l"
`#{cmd}`.to_i
end
def stop
@timer.stop
end
end
# All methods are optional
class BenchmarkObserverBase
def before_exec(logstash_path, *args); end
def after_exec(pid, logstash_path, *args); end
def before_warmup(seconds); end
def after_warmup(seconds); end
def before_start(seconds); end
def before_stop(seconds); end
def after_stop(pid, exit_code); end
end
class InstrumentObserver < BenchmarkObserverBase
def initialize(instrument)
@instrument = instrument
end
def after_exec(pid, logstash_path, *args)
@pid = pid
end
def before_start(seconds)
@instrument.start(@pid)
end
def before_stop(seconds)
@instrument.stop
end
end
# Print statuses
class ConsolePrinterObserver < BenchmarkObserverBase
def before_exec(logstash_path, *args)
puts "Will execute: #{logstash_path} with options #{args}"
end
def after_exec(pid, logstash_path, *args)
puts "Started logstash with PID #{pid}"
end
def before_warmup(seconds)
puts "Will warmup for #{seconds} seconds before benchmarking"
end
def after_warmup(seconds)
puts "Warmup ended."
end
def before_start(seconds)
puts "Will measure process for #{seconds} seconds"
end
def before_stop(seconds)
puts "Finished measuring process"
end
def after_stop(pid, exit_code)
puts "Process #{pid} exited with status code #{exit_code}"
end
end
class BenchmarkRun
def initialize(logstash_path, logstash_args, options)
@options = options
@warmup = @options[:warmup]
@duration = @options[:duration]
@logstash_path = logstash_path
@logstash_args = logstash_args
@observers = options[:observers]
end
def execute
@observers.each {|o| o.before_exec(@logstash_path, *@logstash_args) }
@pid = spawn(@logstash_path, *@logstash_args)
@observers.each {|o| o.after_exec(@pid, @logstash_path, *@logstash_args) }
# This needs to be in another thread because Process.wait will block
Thread.new {
begin
@observers.each {|o| o.before_warmup(@warmup) }
sleep @warmup
@observers.each {|o| o.after_warmup(@warmup) }
@observers.each {|o| o.before_start(@duration) }
sleep @duration
@observers.each {|o| o.before_stop(@duration) }
stop_process!
rescue => e
$stderr.write "Error while executing! #{e.message} #{e.class} #{e.backtrace}"
end
}
@exit_code = Process.wait(@pid)
@observers.each {|o| o.after_stop(@pid, @exit_code)}
end
def stop_process!
Process.kill(15, @pid)
i = 0
while process_alive? && i < 10
sleep 0.25
i += 1
end
Process.kill(9, @pid)
end
def process_alive?
Process.kill(0, @pid)
true
rescue Errno::ESRCH => e
false
end
end
BENCHMARK_PROFILES = {
:standard => {
:observers => [ConsolePrinterObserver],
:instruments => [EsThroughputInstrument, EsConnectionInstrument]
}
}
class BenchmarkRunner < Clamp::Command
option ["-e", "--config-string"], "CONFIG", "Config as string"
option ["-f", "--config-file"], "FILENAME", "Config file path"
option ["-w", "--logstash-workers"], "#", "# Logstash workers (-w)"
option ["-l", "--logstash-log-to"], "#", "# Logstash log location)"
option ["--log-level"], "LOG_LEVEL", "Logstash log level"
option "--logstash-path", "PATH", "Path to Logstash binary", :default => "bin/logstash"
option "--es", "URL", "Remote URL for Elasticsearch. Used for measuring throughput", :default => "http://localhost:9200"
option "--run-name", "NAME", "Name for this run", :default => "Run #{Time.now}"
option("--duration", "SECONDS", "Seconds to benchmark", :default => 300) do |s|
Integer(s)
end
option("--warmup", "SECONDS", "Seconds to warmup", :default => 30) do |s|
Integer(s)
end
option "--profile-name", "PROFILE", "Benchmark profile name. One of #{BENCHMARK_PROFILES.keys}", :default => "standard"
def execute
opts = {
:name => run_name,
:warmup => warmup,
:duration => duration,
:es => es
}
profile = BENCHMARK_PROFILES[profile_name.to_sym]
opts[:observers] = profile[:observers].map {|observer_class| observer_class.new}
instruments = profile[:instruments].map {|inst_class| inst_class.new(opts)}
instruments.each do |instrument|
opts[:observers] << InstrumentObserver.new(instrument)
end
bm_run = BenchmarkRun.new(logstash_path, logstash_args, opts)
bm_run.execute
reporter = ConsoleMetricReporter.new
instruments.each {|instrument| reporter.report(instrument.recorder)}
end
def logstash_args
{
:config_string => "-e",
:config_file => "-f",
:logstash_workers => "-w",
:logstash_log_to => "-l",
:log_level => "--log.level"
}.reduce([]) do |acc,kv|
value_method_name, arg_name = kv
value = self.send(value_method_name)
if value
acc << arg_name
acc << value
end
acc
end
end
end
BenchmarkRunner.run
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.