Skip to content

Instantly share code, notes, and snippets.

@thbar
Created June 30, 2015 07:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thbar/62b5ace7feebd0260352 to your computer and use it in GitHub Desktop.
Save thbar/62b5ace7feebd0260352 to your computer and use it in GitHub Desktop.
A Kiba ETL script to analyze Sidekiq TTIN logs
require_relative 'common'
file = ENV['FILE']
unless file
abort("Run with:\n FILE=mylog.log bundle exec kiba #{__FILE__}")
end
source MultilineLogSource, file, new_line_pattern: /^\d+-\d+-\d+T\d+:\d+:\d+/
transform do |row|
row.include?(' WARN: ') ? row : nil
end
transform { |r| { raw_line: r } }
current_worker = nil
# the stack trace for a given worker is given by the next line, which
# won't hold the worker id but the stacktrace dumper worker id! save it
# for the next row
transform do |row|
if row[:raw_line] =~ /WARN: Thread (TID\-.*)$/
current_worker = $1
# skip this row
nil
else
row[:worker] = current_worker.strip
current_worker = nil
# keep the row for processing
row
end
end
keys = %i(timestamp pidfile dumper_worker level)
transform do |r|
remaining = r[:raw_line]
keys.each do |k|
r[k], split, remaining = *remaining.partition(' ')
end
r[:content] = remaining.split("\n")
r[:location] = r[:content][0..50]
r
end
# remove sleeping workers
reject { |r| r[:raw_line] =~ /mailbox.rb:\d+:in `sleep'/ }
# remove the thread doing the ttin dump
reject { |r| r[:raw_line] =~ /cli.rb:\d+:in `backtrace'/ }
# remove other polluting lines
reject { |r| r[:raw_line] =~ /INFO: (start|done)/ }
reject { |r| r[:raw_line] =~ /JID\-[^ ]+ INFO:/ }
reject { |r| r[:raw_line] =~ /no backtrace available/ }
remove_field(:raw_line)
counts = Hash.new(0)
transform do |row|
counts[row[:location]] += 1
row
end
categories = Hash.new(0)
transform do |row|
row[:category] = case row[:location].join(' ')
when /release_connection/ then 'release_connection'
when /retrieve_connection/ then 'retrieve_connection'
else 'something else'
end
categories[row[:category]] += 1
row
end
post_process do
ap counts.sort_by { |k,v| v } #.reverse
ap "Total threads: #{ counts.values.inject { |sum,x| sum + x } }"
ap categories.sort_by { |k,v| v }
end
#show_me!
require 'awesome_print'
def limit(n)
counter = 0
transform do |row|
counter += 1
abort("Stopped (limit #{n})") if counter > n
row
end
end
def show_me!
transform do |row|
ap row
row
end
end
def reject
transform do |row|
yield(row) ? nil : row
end
end
def remove_field(field)
transform do |row|
row.delete(field)
row
end
end
class MultilineLogSource
def initialize(filename, new_line_pattern:)
@filename = filename
@new_line_pattern = new_line_pattern
end
def each
buffer = ""
IO.foreach(@filename) do |line|
if line =~ @new_line_pattern
# new line detected, so consider the previous line complete
yield(buffer) if buffer
buffer = line
else
buffer += line
end
end
# at the end, make sure to flush remaining line
yield(buffer) if buffer
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment