|
#!/usr/bin/env ruby |
|
# coding: UTF-8 |
|
|
|
# elb-logs.rb |
|
# |
|
# fetch and analyze ELB access logs |
|
# (http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html) |
|
# |
|
# created on : 2014.03.07 |
|
# last update: 2014.03.10 |
|
# |
|
# by meinside@gmail.com |
|
|
|
require 'bundler/setup' |
|
|
|
require 'my_aws' |
|
require 'my_sqlite' |
|
|
|
require 'thor' |
|
require 'geocoder' |
|
|
|
DB_FILENAME = 'elb_access_logs.sqlite' |
|
|
|
S3_CONFIGS = { |
|
access_key_id: '__aws_access_key_id__', |
|
secret_access_key: '__aws_secret_access_key__', |
|
bucket: '__s3_bucket_name__', |
|
} |
|
|
|
module AWS |
|
class ELB::AccessLog |
|
attr_reader :timestamp, :elb, :client_ip, :client_port, :backend_ip, :backend_port, :process_time, :received_bytes, :sent_bytes, :request_method, :request_url, :request_protocol, :response_status |
|
|
|
def initialize(line) |
|
components = line.split(' ') |
|
|
|
# timestamp |
|
@timestamp = Time.parse(components[0]) |
|
|
|
# elb name |
|
@elb = components[1] |
|
|
|
# client |
|
@client_ip, @client_port = components[2].split(':') |
|
|
|
# backend (EC2 instance) |
|
@backend_ip, @backend_port = components[3].split(':') |
|
|
|
# time taken |
|
@process_time = components[5].to_f |
|
|
|
# recv/send |
|
@received_bytes = components[9].to_i |
|
@sent_bytes = components[10].to_i |
|
|
|
# request (HTTP only) |
|
request = line.scan(/"(.*?)"$/)[0][0] |
|
@request_method, @request_url, @request_protocol = request.split(/\s/).map(&:strip) |
|
@request_method = nil if @request_method == '-' |
|
@request_url = nil if @request_url == '-' |
|
@request_protocol = nil if @request_protocol == '-' |
|
|
|
# response (HTTP only) |
|
@response_status = components[8] |
|
@response_status = nil if @response_status == '-' |
|
end |
|
|
|
def to_s |
|
"[#{@elb}] #{@timestamp.strftime('%Y-%m-%d %H:%M:%S.%N')} | #{@client_ip}:#{@client_port} - #{@backend_ip}:#{@backend_port}, #{@process_time} seconds, recv: #{@received_bytes} / send: #{@sent_bytes} bytes" + (@request_method.nil? ? '' : " (#{@request_method} #{@request_protocol} #{@request_url} => #{@response_status})") |
|
end |
|
|
|
class Helper |
|
def self.parse_key(key) |
|
components = File.basename(key.split('/')[-1], '.*').split('_') |
|
{ |
|
aws_account: components[0], |
|
region: components[2], |
|
elb: components[3], |
|
datetime: Time.parse(components[4]), |
|
elb_ip: components[5], |
|
key: key, |
|
} |
|
end |
|
end |
|
|
|
class Database |
|
attr_accessor :filepath |
|
|
|
@@db = nil |
|
|
|
private |
|
|
|
def initialize(filepath) |
|
@filepath = filepath |
|
@db = MySqlite.open(@filepath) |
|
|
|
# table: fetched |
|
@db.execute_query('create table if not exists fetched( |
|
id integer primary key autoincrement, |
|
aws_account text not null, |
|
region text not null, |
|
elb_ip text not null, |
|
log_time text not null |
|
)') |
|
@db.execute_query('create index if not exists idx_fetched on fetched( |
|
aws_account, region, elb_ip, log_time |
|
)') |
|
|
|
# table: access_logs |
|
@db.execute_query('create table if not exists access_logs( |
|
id integer primary key autoincrement, |
|
aws_account text not null, |
|
region text not null, |
|
elb_ip text not null, |
|
log_time text not null, |
|
elb text not null, |
|
timestamp text not null, |
|
client_ip text not null, |
|
client_port integer default null, |
|
backend_ip text not null, |
|
backend_port integer default null, |
|
process_time real not null, |
|
received_bytes integer not null, |
|
sent_bytes integer not null, |
|
request_method text default null, |
|
request_url text default null, |
|
request_protocol text default null, |
|
response_status text default null |
|
)') |
|
@db.execute_query('create index if not exists idx_access_logs_1 on access_logs( |
|
aws_account, region, elb_ip |
|
)') |
|
@db.execute_query('create index if not exists idx_access_logs_2 on access_logs( |
|
aws_account, region, elb_ip, log_time |
|
)') |
|
@db.execute_query('create index if not exists idx_access_logs_3 on access_logs( |
|
timestamp |
|
)') |
|
@db.execute_query('create index if not exists idx_access_logs_4 on access_logs( |
|
client_ip |
|
)') |
|
@db.execute_query('create index if not exists idx_access_logs_5 on access_logs( |
|
response_status |
|
)') |
|
|
|
# table: geo_locations |
|
@db.execute_query('create table if not exists geo_locations( |
|
ip text primary key, |
|
country_code text default null, |
|
country_name text default null, |
|
city text default null, |
|
latitude real default null, |
|
longitude real default null |
|
)') |
|
@db.execute_query('create index if not exists idx_geo_locations_1 on geo_locations( |
|
country_code |
|
)') |
|
@db.execute_query('create index if not exists idx_geo_locations_2 on geo_locations( |
|
country_name |
|
)') |
|
@db.execute_query('create index if not exists idx_geo_locations_999 on geo_locations( |
|
latitude, |
|
longitude |
|
)') |
|
end |
|
|
|
public |
|
|
|
def self.instance |
|
@@db = Database.new(File.join(File.dirname(__FILE__), DB_FILENAME)) unless @@db |
|
@@db |
|
end |
|
|
|
# check if logs with given aws_account/region/elb_ip/log_time already exist |
|
# |
|
# @param aws_account [String] aws account |
|
# @param region [String] region |
|
# @param elb_ip [String] ELB's ip address |
|
# @param log_time [Date] log time |
|
# |
|
# @return [true,false] |
|
def log_exists?(aws_account, region, elb_ip, log_time) |
|
@db.execute_query('select count(id) from fetched where aws_account = ? and region = ? and elb_ip = ? and log_time = ?', |
|
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] |
|
)[0][0] > 0 |
|
end |
|
|
|
# save log entry |
|
# |
|
# @param aws_account [String] aws account |
|
# @param region [String] region |
|
# @param elb_ip [String] ELB's ip address |
|
# @param log_time [Date] log time |
|
# @param log_entry [AWS::ELB::LogEntry] log entry |
|
def save_log(aws_account, region, elb_ip, log_time, log_entry) |
|
# insert to the table |
|
@db.execute_query( |
|
'insert into access_logs( |
|
aws_account, region, elb_ip, log_time, |
|
elb, timestamp, client_ip, client_port, backend_ip, backend_port, process_time, received_bytes, sent_bytes, |
|
request_method, request_url, request_protocol, response_status |
|
) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', |
|
[ |
|
aws_account, |
|
region, |
|
elb_ip, |
|
log_time.strftime('%Y-%m-%d %H:%M:%S'), |
|
log_entry.elb, |
|
log_entry.timestamp.strftime('%Y-%m-%d %H:%M:%S.%N'), |
|
log_entry.client_ip, |
|
log_entry.client_port, |
|
log_entry.backend_ip, |
|
log_entry.backend_port, |
|
log_entry.process_time, |
|
log_entry.received_bytes, |
|
log_entry.sent_bytes, |
|
log_entry.request_method, |
|
log_entry.request_url, |
|
log_entry.request_protocol, |
|
log_entry.response_status |
|
] |
|
) |
|
end |
|
|
|
# mark given log as fetched |
|
# |
|
# @param aws_account [String] aws account |
|
# @param region [String] region |
|
# @param elb_ip [String] ip address |
|
# @param log_time [Time] log time |
|
def mark_log_fetched(aws_account, region, elb_ip, log_time) |
|
# mark this log as fetched |
|
@db.execute_query( |
|
'insert into fetched(aws_account, region, elb_ip, log_time) values(?, ?, ?, ?)', |
|
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] |
|
) |
|
end |
|
|
|
# list all ips from saved logs |
|
# |
|
# @param option [Hash] option for listing |
|
# @return [Array<String>] array of ips |
|
def client_ips(option = nil) |
|
@db.execute_query("select #{option[:unique] ? 'distinct' : ''} client_ip from access_logs order by timestamp").map{|x| x[0]} |
|
end |
|
|
|
# cache ip and its geo location information for future use |
|
# |
|
# @param ip [String] ip address |
|
# @return [Database::GeoLocation, nil] successfully cached or not |
|
def cache_geo(ip) |
|
unless cached = cached_geo(ip) |
|
info = Geocoder.search(ip).first |
|
@db.execute_query('insert or replace into geo_locations(ip, country_code, country_name, city, latitude, longitude) values(?, ?, ?, ?, ?, ?)', |
|
[info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude] |
|
) |
|
return GeoLocation.new(info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude) |
|
end |
|
nil |
|
rescue |
|
puts "* exception while caching ip: #{ip} - #{$!}" |
|
nil |
|
end |
|
|
|
# get cached geo location information |
|
# |
|
# @param ip [String] ip address |
|
# @return [Database::GeoLocation, nil] nil if not cached |
|
def cached_geo(ip) |
|
geo = @db.execute_query('select * from geo_locations where ip = ?', |
|
[ip] |
|
) |
|
if geo.count > 0 |
|
GeoLocation.new(geo[0], geo[1], geo[2], geo[3], geo[4], geo[5]) |
|
else |
|
nil |
|
end |
|
end |
|
|
|
# get ELB names |
|
# |
|
# @return [Array<String>] ELB names |
|
def elbs |
|
@db.execute_query('select distinct elb from access_logs order by elb').map{|x| x[0]} |
|
end |
|
|
|
# get number of ips per country for given ELB name |
|
# |
|
# @param elb [String] ELB name |
|
def num_ips_per_country(elb) |
|
@db.execute_query('select country_name as country, count(ip) as num_ips |
|
from geo_locations |
|
where ip in (select distinct client_ip from access_logs where elb = ?) |
|
group by country_name |
|
order by num_ips desc', |
|
[elb] |
|
).map{|x| {country: x[0], num_ips: x[1]}} |
|
end |
|
|
|
def info |
|
# from ~ to |
|
from = @db.execute_query('select timestamp from access_logs order by timestamp limit 1').first[0] |
|
to = @db.execute_query('select timestamp from access_logs order by timestamp desc limit 1').first[0] |
|
|
|
# number of access logs |
|
num_access_logs = {} |
|
elbs.each{|elb| |
|
num_access_logs[elb] = @db.execute_query('select count(id) from access_logs where elb = ?', [elb]).first[0] |
|
} |
|
num_access_logs[:all] = @db.execute_query('select count(id) from access_logs').first[0] |
|
|
|
# number of ips |
|
num_ips = {} |
|
elbs.each{|elb| |
|
num_ips[elb] = @db.execute_query('select count(ip) from geo_locations |
|
where ip in (select distinct client_ip from access_logs where elb = ?)', |
|
[elb] |
|
).first[0] |
|
} |
|
num_ips[:all] = @db.execute_query('select count(distinct ip) from geo_locations').first[0] |
|
|
|
# return |
|
{ |
|
from: from, |
|
to: to, |
|
num_access_logs: num_access_logs, |
|
num_ips: num_ips, |
|
} |
|
end |
|
|
|
class GeoLocation |
|
attr_accessor :ip, :country_code, :country_name, :city, :latitude, :longitude |
|
|
|
def initialize(ip, country_code, country_name, city, latitude, longitude) |
|
@ip = ip |
|
@country_code = country_code |
|
@country_name = country_name |
|
@city = city |
|
@latitude = latitude |
|
@longitude = longitude |
|
end |
|
|
|
def to_s |
|
"[%s] %s(%s), %s (%.4f, %.4f)" %[@ip, @country_name, @country_code, @city, @latitude, @longitude] |
|
end |
|
end |
|
end |
|
end |
|
end |
|
|
|
class Exec < Thor |
|
default_task :fetch_logs |
|
|
|
desc "fetch_logs", "fetch all ELB access log files from S3" |
|
method_option :cache_geo, type: :boolean, aliases: '-g', desc: 'also cache geo locations for logged ips' |
|
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' |
|
def fetch_logs |
|
puts "> fetching logs..." |
|
|
|
# configure |
|
if MyAws::S3.config(S3_CONFIGS) |
|
num_logs = 0 |
|
|
|
# fetch objects from the bucket |
|
MyAws::S3.bucket(S3_CONFIGS[:bucket]).objects.each{|o| |
|
# get the database instance |
|
db = AWS::ELB::AccessLog::Database.instance |
|
|
|
# parse only actual log files |
|
if o.key =~ /elasticloadbalancing.*?\.log$/ |
|
parsed = AWS::ELB::AccessLog::Helper.parse_key(o.key) |
|
aws_account = parsed[:aws_account] |
|
elb = parsed[:elb] |
|
region = parsed[:region] |
|
elb_ip = parsed[:elb_ip] |
|
log_time = parsed[:datetime] |
|
|
|
# check if this log is already fetched/saved |
|
if db.log_exists?(aws_account, region, elb_ip, log_time) |
|
puts "* skipping alread-fetched log: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? |
|
next |
|
else |
|
num_logs += 1 |
|
end |
|
|
|
# read all log lines |
|
puts "> processing logs for: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? |
|
o.read.lines.map{|x| x.strip}.each{|l| |
|
entry = AWS::ELB::AccessLog.new(l) |
|
|
|
db.save_log(aws_account, region, elb_ip, log_time, entry) |
|
} |
|
|
|
# mark log as fetched |
|
db.mark_log_fetched(aws_account, region, elb_ip, log_time) |
|
end |
|
} |
|
|
|
puts ">>> fetched #{num_logs} new log file(s)" |
|
|
|
# also cache geo locations if option is provided |
|
cache_geo if options.cache_geo? |
|
else |
|
puts "* S3 configuration failed" |
|
end |
|
end |
|
|
|
desc "cache_geo", "cache geo location info for client ips in saved logs" |
|
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' |
|
def cache_geo |
|
puts "> caching geo locations for logged ips..." |
|
|
|
db = AWS::ELB::AccessLog::Database.instance |
|
|
|
num_ips = 0 |
|
db.client_ips({unique: true}).each{|ip| |
|
if cached = db.cache_geo(ip) |
|
puts "> cached geo ip: #{cached}" if options.verbose? |
|
|
|
num_ips += 1 |
|
end |
|
} |
|
|
|
puts ">>> cached #{num_ips} new geo location(s)" |
|
end |
|
|
|
desc "count_ips", "list number of unique ips per country" |
|
def count_ips |
|
puts "> counting ips per country..." |
|
|
|
db = AWS::ELB::AccessLog::Database.instance |
|
|
|
db.elbs.each{|elb| |
|
puts ">>> ELB: #{elb}" |
|
|
|
db.num_ips_per_country(elb).each{|counts| |
|
puts "#{counts[:country]}: #{counts[:num_ips]}" |
|
} |
|
|
|
puts |
|
} |
|
end |
|
|
|
desc "info", "show info on database file" |
|
def info |
|
puts "> printing info of database..." |
|
|
|
db = AWS::ELB::AccessLog::Database.instance |
|
|
|
puts ">>> database: #{db.filepath}" |
|
|
|
info = db.info |
|
|
|
puts ">>> #{info[:from]} ~ #{info[:to]}" |
|
|
|
puts ">>> number of accesses" |
|
info[:num_access_logs].each{|k, v| |
|
puts " #{k}: #{v}" |
|
} |
|
|
|
puts ">>> number of ips" |
|
info[:num_ips].each{|k, v| |
|
puts " #{k}: #{v}" |
|
} |
|
end |
|
end |
|
|
|
trap('SIGINT') { puts; exit 1 } |
|
Exec.start(ARGV) |