Created
June 30, 2015 07:46
-
-
Save thbar/62b5ace7feebd0260352 to your computer and use it in GitHub Desktop.
A Kiba ETL script to analyze Sidekiq TTIN logs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'common' | |
file = ENV['FILE'] | |
unless file | |
abort("Run with:\n FILE=mylog.log bundle exec kiba #{__FILE__}") | |
end | |
source MultilineLogSource, file, new_line_pattern: /^\d+-\d+-\d+T\d+:\d+:\d+/ | |
transform do |row| | |
row.include?(' WARN: ') ? row : nil | |
end | |
transform { |r| { raw_line: r } } | |
current_worker = nil | |
# the stack trace for a given worker is given by the next line, which | |
# won't hold the worker id but the stacktrace dumper worker id! save it | |
# for the next row | |
transform do |row| | |
if row[:raw_line] =~ /WARN: Thread (TID\-.*)$/ | |
current_worker = $1 | |
# skip this row | |
nil | |
else | |
row[:worker] = current_worker.strip | |
current_worker = nil | |
# keep the row for processing | |
row | |
end | |
end | |
keys = %i(timestamp pidfile dumper_worker level) | |
transform do |r| | |
remaining = r[:raw_line] | |
keys.each do |k| | |
r[k], split, remaining = *remaining.partition(' ') | |
end | |
r[:content] = remaining.split("\n") | |
r[:location] = r[:content][0..50] | |
r | |
end | |
# remove sleeping workers | |
reject { |r| r[:raw_line] =~ /mailbox.rb:\d+:in `sleep'/ } | |
# remove the thread doing the ttin dump | |
reject { |r| r[:raw_line] =~ /cli.rb:\d+:in `backtrace'/ } | |
# remove other polluting lines | |
reject { |r| r[:raw_line] =~ /INFO: (start|done)/ } | |
reject { |r| r[:raw_line] =~ /JID\-[^ ]+ INFO:/ } | |
reject { |r| r[:raw_line] =~ /no backtrace available/ } | |
remove_field(:raw_line) | |
counts = Hash.new(0) | |
transform do |row| | |
counts[row[:location]] += 1 | |
row | |
end | |
categories = Hash.new(0) | |
transform do |row| | |
row[:category] = case row[:location].join(' ') | |
when /release_connection/ then 'release_connection' | |
when /retrieve_connection/ then 'retrieve_connection' | |
else 'something else' | |
end | |
categories[row[:category]] += 1 | |
row | |
end | |
post_process do | |
ap counts.sort_by { |k,v| v } #.reverse | |
ap "Total threads: #{ counts.values.inject { |sum,x| sum + x } }" | |
ap categories.sort_by { |k,v| v } | |
end | |
#show_me! | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'awesome_print' | |
def limit(n) | |
counter = 0 | |
transform do |row| | |
counter += 1 | |
abort("Stopped (limit #{n})") if counter > n | |
row | |
end | |
end | |
def show_me! | |
transform do |row| | |
ap row | |
row | |
end | |
end | |
def reject | |
transform do |row| | |
yield(row) ? nil : row | |
end | |
end | |
def remove_field(field) | |
transform do |row| | |
row.delete(field) | |
row | |
end | |
end | |
class MultilineLogSource | |
def initialize(filename, new_line_pattern:) | |
@filename = filename | |
@new_line_pattern = new_line_pattern | |
end | |
def each | |
buffer = "" | |
IO.foreach(@filename) do |line| | |
if line =~ @new_line_pattern | |
# new line detected, so consider the previous line complete | |
yield(buffer) if buffer | |
buffer = line | |
else | |
buffer += line | |
end | |
end | |
# at the end, make sure to flush remaining line | |
yield(buffer) if buffer | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment