Skip to content

Instantly share code, notes, and snippets.

@mariusv

mariusv/Gemfile

Last active Aug 29, 2015
Embed
What would you like to do?

##Requirements

  • git
  • json (1.8.1)
  • mysql (2.9.1)
  • sqlite3 (1.3.10)

How to:

Deb based OS:

  • apt-get install libmysqlclient-dev libsqlite3-dev

OSX:

  • brew install mysql

  • brew install sqlite3

  • gem install mysql -v '2.9.1'

  • gem install sqlite3 -v '1.3.10'

#!/usr/bin/env ruby
# coding: UTF-8
# elb-logs.rb
#
# fetch and analyze ELB access logs
# (http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html)
#
# created on : 2014.03.07
# last update: 2014.03.10
#
# by meinside@gmail.com
require 'bundler/setup'
require 'my_aws'
require 'my_sqlite'
require 'thor'
require 'geocoder'
DB_FILENAME = 'elb_access_logs.sqlite'
S3_CONFIGS = {
access_key_id: '__aws_access_key_id__',
secret_access_key: '__aws_secret_access_key__',
bucket: '__s3_bucket_name__',
}
module AWS
class ELB::AccessLog
attr_reader :timestamp, :elb, :client_ip, :client_port, :backend_ip, :backend_port, :process_time, :received_bytes, :sent_bytes, :request_method, :request_url, :request_protocol, :response_status
def initialize(line)
components = line.split(' ')
# timestamp
@timestamp = Time.parse(components[0])
# elb name
@elb = components[1]
# client
@client_ip, @client_port = components[2].split(':')
# backend (EC2 instance)
@backend_ip, @backend_port = components[3].split(':')
# time taken
@process_time = components[5].to_f
# recv/send
@received_bytes = components[9].to_i
@sent_bytes = components[10].to_i
# request (HTTP only)
request = line.scan(/"(.*?)"$/)[0][0]
@request_method, @request_url, @request_protocol = request.split(/\s/).map(&:strip)
@request_method = nil if @request_method == '-'
@request_url = nil if @request_url == '-'
@request_protocol = nil if @request_protocol == '-'
# response (HTTP only)
@response_status = components[8]
@response_status = nil if @response_status == '-'
end
def to_s
"[#{@elb}] #{@timestamp.strftime('%Y-%m-%d %H:%M:%S.%N')} | #{@client_ip}:#{@client_port} - #{@backend_ip}:#{@backend_port}, #{@process_time} seconds, recv: #{@received_bytes} / send: #{@sent_bytes} bytes" + (@request_method.nil? ? '' : " (#{@request_method} #{@request_protocol} #{@request_url} => #{@response_status})")
end
class Helper
def self.parse_key(key)
components = File.basename(key.split('/')[-1], '.*').split('_')
{
aws_account: components[0],
region: components[2],
elb: components[3],
datetime: Time.parse(components[4]),
elb_ip: components[5],
key: key,
}
end
end
class Database
attr_accessor :filepath
@@db = nil
private
def initialize(filepath)
@filepath = filepath
@db = MySqlite.open(@filepath)
# table: fetched
@db.execute_query('create table if not exists fetched(
id integer primary key autoincrement,
aws_account text not null,
region text not null,
elb_ip text not null,
log_time text not null
)')
@db.execute_query('create index if not exists idx_fetched on fetched(
aws_account, region, elb_ip, log_time
)')
# table: access_logs
@db.execute_query('create table if not exists access_logs(
id integer primary key autoincrement,
aws_account text not null,
region text not null,
elb_ip text not null,
log_time text not null,
elb text not null,
timestamp text not null,
client_ip text not null,
client_port integer default null,
backend_ip text not null,
backend_port integer default null,
process_time real not null,
received_bytes integer not null,
sent_bytes integer not null,
request_method text default null,
request_url text default null,
request_protocol text default null,
response_status text default null
)')
@db.execute_query('create index if not exists idx_access_logs_1 on access_logs(
aws_account, region, elb_ip
)')
@db.execute_query('create index if not exists idx_access_logs_2 on access_logs(
aws_account, region, elb_ip, log_time
)')
@db.execute_query('create index if not exists idx_access_logs_3 on access_logs(
timestamp
)')
@db.execute_query('create index if not exists idx_access_logs_4 on access_logs(
client_ip
)')
@db.execute_query('create index if not exists idx_access_logs_5 on access_logs(
response_status
)')
# table: geo_locations
@db.execute_query('create table if not exists geo_locations(
ip text primary key,
country_code text default null,
country_name text default null,
city text default null,
latitude real default null,
longitude real default null
)')
@db.execute_query('create index if not exists idx_geo_locations_1 on geo_locations(
country_code
)')
@db.execute_query('create index if not exists idx_geo_locations_2 on geo_locations(
country_name
)')
@db.execute_query('create index if not exists idx_geo_locations_999 on geo_locations(
latitude,
longitude
)')
end
public
def self.instance
@@db = Database.new(File.join(File.dirname(__FILE__), DB_FILENAME)) unless @@db
@@db
end
# check if logs with given aws_account/region/elb_ip/log_time already exist
#
# @param aws_account [String] aws account
# @param region [String] region
# @param elb_ip [String] ELB's ip address
# @param log_time [Date] log time
#
# @return [true,false]
def log_exists?(aws_account, region, elb_ip, log_time)
@db.execute_query('select count(id) from fetched where aws_account = ? and region = ? and elb_ip = ? and log_time = ?',
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')]
)[0][0] > 0
end
# save log entry
#
# @param aws_account [String] aws account
# @param region [String] region
# @param elb_ip [String] ELB's ip address
# @param log_time [Date] log time
# @param log_entry [AWS::ELB::LogEntry] log entry
def save_log(aws_account, region, elb_ip, log_time, log_entry)
# insert to the table
@db.execute_query(
'insert into access_logs(
aws_account, region, elb_ip, log_time,
elb, timestamp, client_ip, client_port, backend_ip, backend_port, process_time, received_bytes, sent_bytes,
request_method, request_url, request_protocol, response_status
) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)',
[
aws_account,
region,
elb_ip,
log_time.strftime('%Y-%m-%d %H:%M:%S'),
log_entry.elb,
log_entry.timestamp.strftime('%Y-%m-%d %H:%M:%S.%N'),
log_entry.client_ip,
log_entry.client_port,
log_entry.backend_ip,
log_entry.backend_port,
log_entry.process_time,
log_entry.received_bytes,
log_entry.sent_bytes,
log_entry.request_method,
log_entry.request_url,
log_entry.request_protocol,
log_entry.response_status
]
)
end
# mark given log as fetched
#
# @param aws_account [String] aws account
# @param region [String] region
# @param elb_ip [String] ip address
# @param log_time [Time] log time
def mark_log_fetched(aws_account, region, elb_ip, log_time)
# mark this log as fetched
@db.execute_query(
'insert into fetched(aws_account, region, elb_ip, log_time) values(?, ?, ?, ?)',
[aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')]
)
end
# list all ips from saved logs
#
# @param option [Hash] option for listing
# @return [Array<String>] array of ips
def client_ips(option = nil)
@db.execute_query("select #{option[:unique] ? 'distinct' : ''} client_ip from access_logs order by timestamp").map{|x| x[0]}
end
# cache ip and its geo location information for future use
#
# @param ip [String] ip address
# @return [Database::GeoLocation, nil] successfully cached or not
def cache_geo(ip)
unless cached = cached_geo(ip)
info = Geocoder.search(ip).first
@db.execute_query('insert or replace into geo_locations(ip, country_code, country_name, city, latitude, longitude) values(?, ?, ?, ?, ?, ?)',
[info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude]
)
return GeoLocation.new(info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude)
end
nil
rescue
puts "* exception while caching ip: #{ip} - #{$!}"
nil
end
# get cached geo location information
#
# @param ip [String] ip address
# @return [Database::GeoLocation, nil] nil if not cached
def cached_geo(ip)
geo = @db.execute_query('select * from geo_locations where ip = ?',
[ip]
)
if geo.count > 0
GeoLocation.new(geo[0], geo[1], geo[2], geo[3], geo[4], geo[5])
else
nil
end
end
# get ELB names
#
# @return [Array<String>] ELB names
def elbs
@db.execute_query('select distinct elb from access_logs order by elb').map{|x| x[0]}
end
# get number of ips per country for given ELB name
#
# @param elb [String] ELB name
def num_ips_per_country(elb)
@db.execute_query('select country_name as country, count(ip) as num_ips
from geo_locations
where ip in (select distinct client_ip from access_logs where elb = ?)
group by country_name
order by num_ips desc',
[elb]
).map{|x| {country: x[0], num_ips: x[1]}}
end
def info
# from ~ to
from = @db.execute_query('select timestamp from access_logs order by timestamp limit 1').first[0]
to = @db.execute_query('select timestamp from access_logs order by timestamp desc limit 1').first[0]
# number of access logs
num_access_logs = {}
elbs.each{|elb|
num_access_logs[elb] = @db.execute_query('select count(id) from access_logs where elb = ?', [elb]).first[0]
}
num_access_logs[:all] = @db.execute_query('select count(id) from access_logs').first[0]
# number of ips
num_ips = {}
elbs.each{|elb|
num_ips[elb] = @db.execute_query('select count(ip) from geo_locations
where ip in (select distinct client_ip from access_logs where elb = ?)',
[elb]
).first[0]
}
num_ips[:all] = @db.execute_query('select count(distinct ip) from geo_locations').first[0]
# return
{
from: from,
to: to,
num_access_logs: num_access_logs,
num_ips: num_ips,
}
end
class GeoLocation
attr_accessor :ip, :country_code, :country_name, :city, :latitude, :longitude
def initialize(ip, country_code, country_name, city, latitude, longitude)
@ip = ip
@country_code = country_code
@country_name = country_name
@city = city
@latitude = latitude
@longitude = longitude
end
def to_s
"[%s] %s(%s), %s (%.4f, %.4f)" %[@ip, @country_name, @country_code, @city, @latitude, @longitude]
end
end
end
end
end
class Exec < Thor
default_task :fetch_logs
desc "fetch_logs", "fetch all ELB access log files from S3"
method_option :cache_geo, type: :boolean, aliases: '-g', desc: 'also cache geo locations for logged ips'
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages'
def fetch_logs
puts "> fetching logs..."
# configure
if MyAws::S3.config(S3_CONFIGS)
num_logs = 0
# fetch objects from the bucket
MyAws::S3.bucket(S3_CONFIGS[:bucket]).objects.each{|o|
# get the database instance
db = AWS::ELB::AccessLog::Database.instance
# parse only actual log files
if o.key =~ /elasticloadbalancing.*?\.log$/
parsed = AWS::ELB::AccessLog::Helper.parse_key(o.key)
aws_account = parsed[:aws_account]
elb = parsed[:elb]
region = parsed[:region]
elb_ip = parsed[:elb_ip]
log_time = parsed[:datetime]
# check if this log is already fetched/saved
if db.log_exists?(aws_account, region, elb_ip, log_time)
puts "* skipping alread-fetched log: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose?
next
else
num_logs += 1
end
# read all log lines
puts "> processing logs for: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose?
o.read.lines.map{|x| x.strip}.each{|l|
entry = AWS::ELB::AccessLog.new(l)
db.save_log(aws_account, region, elb_ip, log_time, entry)
}
# mark log as fetched
db.mark_log_fetched(aws_account, region, elb_ip, log_time)
end
}
puts ">>> fetched #{num_logs} new log file(s)"
# also cache geo locations if option is provided
cache_geo if options.cache_geo?
else
puts "* S3 configuration failed"
end
end
desc "cache_geo", "cache geo location info for client ips in saved logs"
method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages'
def cache_geo
puts "> caching geo locations for logged ips..."
db = AWS::ELB::AccessLog::Database.instance
num_ips = 0
db.client_ips({unique: true}).each{|ip|
if cached = db.cache_geo(ip)
puts "> cached geo ip: #{cached}" if options.verbose?
num_ips += 1
end
}
puts ">>> cached #{num_ips} new geo location(s)"
end
desc "count_ips", "list number of unique ips per country"
def count_ips
puts "> counting ips per country..."
db = AWS::ELB::AccessLog::Database.instance
db.elbs.each{|elb|
puts ">>> ELB: #{elb}"
db.num_ips_per_country(elb).each{|counts|
puts "#{counts[:country]}: #{counts[:num_ips]}"
}
puts
}
end
desc "info", "show info on database file"
def info
puts "> printing info of database..."
db = AWS::ELB::AccessLog::Database.instance
puts ">>> database: #{db.filepath}"
info = db.info
puts ">>> #{info[:from]} ~ #{info[:to]}"
puts ">>> number of accesses"
info[:num_access_logs].each{|k, v|
puts " #{k}: #{v}"
}
puts ">>> number of ips"
info[:num_ips].each{|k, v|
puts " #{k}: #{v}"
}
end
end
trap('SIGINT') { puts; exit 1 }
Exec.start(ARGV)
source 'http://rubygems.org'
gem 'meinside-ruby', github: 'meinside/meinside-ruby' # my ruby scripts and libraries
gem 'thor'
gem 'geocoder'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment