Skip to content

Instantly share code, notes, and snippets.

@yomon8
Last active May 13, 2017 15:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yomon8/8ca89d938aa065f391ba7be14b75b03f to your computer and use it in GitHub Desktop.
Save yomon8/8ca89d938aa065f391ba7be14b75b03f to your computer and use it in GitHub Desktop.
Amazon ALB(Application Load Balancer) log input plugin for fluentd
class Fluent::Alb_LogInput < Fluent::Input
Fluent::Plugin.register_input('alb_log', self)
LOGFILE_REGEXP = /^((?<prefix>.+?)\/|)AWSLogs\/(?<account_id>[0-9]{12})\/elasticloadbalancing\/(?<region>.+?)\/(?<logfile_date>[0-9]{4}\/[0-9]{2}\/[0-9]{2})\/[0-9]{12}_elasticloadbalancing_.+?_(?<logfile_alb_name>[^_]+)_(?<alb_timestamp>[0-9]{8}T[0-9]{4}Z)_(?<alb_ip_address>.+?)_(?<logfile_hash>.+)\.log.gz$/
ACCESSLOG_REGEXP = /^(?<type>.+?) (?<time>\d{4}-\d{2}-\d{2}T\d{2}\:\d{2}\:\d{2}.\d{6}Z) (?<alb_fullname>[^\/]+\/(?<alb>[^\/]+?)\/.+?) (?<client>[^ ]+)\:(?<client_port>.+?) (-|(?<target>[^ ]+)\:(?<target_port>.+?)) (?<request_processing_time>.+?) (?<target_processing_time>.+?) (?<response_processing_time>.+?) (?<elb_status_code>.+?) (?<target_status_code>.+?) (?<received_bytes>.+?) (?<sent_bytes>.+?) \"(?<request_method>.+?) (?<request_uri>[httpsfile]+:\/{2,3}(?<domain>[0-9a-z\.\-:\[\]]+?):?[0-9]{0,5}?(|\/\S*)?) (?<request_protocol>.+?)\" \"(\"|(?<user_agent>.+?)\") (?<ssl_cipher>.+?) (?<ssl_protocol>.+?) (-|(?<target_group_arn>[^\/]+\/(?<target_group_name>[^\/]+?)\/.+?)?) \"(?<trace_id>[^\"]+)\"$/
# Define `router` method to support v0.10.57 or earlier
unless method_defined?(:router)
define_method("router") { Fluent::Engine }
end
config_param :access_key_id, :string, :default => nil, :secret => true
config_param :secret_access_key, :string, :default => nil, :secret => true
config_param :region, :string, :default => nil
config_param :s3_bucketname, :string, :default => nil
config_param :s3_prefix, :string, :default => nil
config_param :tag, :string, :default => 'alb.access'
config_param :timestamp_file, :string, :default => nil
config_param :refresh_interval, :integer, :default => 300
config_param :buf_file, :string, :default => './fluentd_alb_log_buf_file'
config_param :http_proxy, :string, :default => nil
config_param :start_time, :string, :default => nil
def configure(conf)
super
require 'aws-sdk'
#require 'zlib'
require 'multiple_files_gzip_reader'
raise Fluent::ConfigError.new("region is required") unless @region
if !has_iam_role?
raise Fluent::ConfigError.new("access_key_id is required") if @access_key_id.nil?
raise Fluent::ConfigError.new("secret_access_key is required") if @secret_access_key.nil?
end
raise Fluent::ConfigError.new("s3_bucketname is required") unless @s3_bucketname
raise Fluent::ConfigError.new("timestamp_file is required") unless @timestamp_file
end
def start
super
# files touch
File.open(@timestamp_file, File::RDWR|File::CREAT).close
File.open(@buf_file, File::RDWR|File::CREAT).close
raise StandardError.new("s3 bucket not found #{@s3_bucketname}") unless s3bucket_is_ok()
@loop = Coolio::Loop.new
timer_trigger = TimerWatcher.new(@refresh_interval, true, &method(:input))
timer_trigger.attach(@loop)
@thread = Thread.new(&method(:run))
end
def shutdown
super
@loop.stop
@thread.join
end
private
def has_iam_role?
begin
ec2 = Aws::EC2::Client.new(region: @region)
!ec2.config.credentials.nil?
rescue => e
$log.warn "EC2 Client error occurred: #{e.message}"
end
end
def get_timestamp_file
require 'time'
begin
# get timestamp last proc
start_time = @start_time ? Time.parse(@start_time).utc : Time.at(0)
timestamp = start_time.to_i
$log.debug "timestamp file #{@timestamp_file} read"
File.open(@timestamp_file, File::RDONLY) do |file|
timestamp = file.read.to_i if file.size > 0
end
$log.debug "timestamp start at:" + Time.at(timestamp).to_s
return timestamp
rescue => e
$log.warn "timestamp file get and parse error occurred: #{e.message}"
end
end
def put_timestamp_file(timestamp)
begin
$log.debug "timestamp file #{@timestamp_file} write"
File.open(@timestamp_file, File::WRONLY|File::CREAT|File::TRUNC) do |file|
file.puts timestamp.to_s
end
rescue => e
$log.warn "timestamp file get and parse error occurred: #{e.message}"
end
end
def s3_client
begin
options = {
:region => @region,
}
if @access_key_id && @secret_access_key
options[:access_key_id] = @access_key_id
options[:secret_access_key] = @secret_access_key
end
if @http_proxy
options[:http_proxy] = @http_proxy
end
$log.debug "S3 client connect"
Aws::S3::Client.new(options)
rescue => e
$log.warn "S3 Client error occurred: #{e.message}"
end
end
def s3bucket_is_ok
begin
$log.debug "search bucket #{@s3_bucketname}"
s3_client.list_buckets.buckets.any? do |bucket|
bucket.name == @s3_bucketname
end
rescue => e
$log.warn "S3 Client error occurred: #{e.message}"
end
end
def input
$log.debug "start"
timestamp = get_timestamp_file()
object_keys = get_object_keys(timestamp)
object_keys = sort_object_key(object_keys)
$log.info "processing #{object_keys.count} object(s)."
object_keys.each do |object_key|
record_common = {
"account_id" => object_key[:account_id],
"region" => object_key[:region],
"logfile_date" => object_key[:logfile_date],
"logfile_alb_name" => object_key[:logfile_alb_name],
"alb_ip_address" => object_key[:alb_ip_address],
"logfile_hash" => object_key[:logfile_hash],
"alb_timestamp" => object_key[:alb_timestamp],
"key" => object_key[:key],
"prefix" => object_key[:prefix],
"alb_timestamp_unixtime" => object_key[:alb_timestamp_unixtime],
}
get_file_from_s3(object_key[:key])
emit_lines_from_buffer_file(record_common)
put_timestamp_file(object_key[:alb_timestamp_unixtime])
end
end
def get_object_keys(timestamp)
# get values from object file name
begin
object_keys = []
objects = s3_client.list_objects(
bucket: @s3_bucketname,
max_keys: 100,
prefix: @s3_prefix,
)
objects.each do |object|
object.contents.each do |content|
matches = LOGFILE_REGEXP.match(content.key)
next unless matches
# snip old items
#alb_timestamp_unixtime = Time.at(content.last_modified).to_i
alb_timestamp_unixtime = Time.parse(matches[:alb_timestamp]).to_i
next if alb_timestamp_unixtime <= timestamp
$log.debug content.key
object_keys << {
key: content.key,
prefix: matches[:prefix],
account_id: matches[:account_id],
region: matches[:region],
logfile_date: matches[:logfile_date],
logfile_alb_name: matches[:logfile_alb_name],
alb_timestamp: matches[:alb_timestamp],
alb_ip_address: matches[:alb_ip_address],
logfile_hash: matches[:logfile_hash],
alb_timestamp_unixtime: alb_timestamp_unixtime,
}
end
end
return object_keys
rescue => e
$log.warn "error occurred: #{e.message}"
end
end
def sort_object_key(src_object_keys)
begin
src_object_keys.sort do |a, b|
a[:alb_timestamp_unixtime] <=> b[:alb_timestamp_unixtime]
end
rescue => e
$log.warn "error occurred: #{e.message}"
end
end
def get_file_from_s3(object_name)
begin
$log.debug "getting object from s3 name is #{object_name}"
# read an object from S3 to a file and write buffer file
File.open(@buf_file, File::WRONLY|File::CREAT|File::TRUNC) do |file|
s3_object = s3_client.get_object( bucket: @s3_bucketname, key: object_name)
file.write(MultipleFilesGzipReader.new(s3_object.body).read)
end
rescue => e
$log.warn "error occurred: #{e.message}"
end
end
def emit_lines_from_buffer_file(record_common)
begin
# emit per line
File.open(@buf_file, File::RDONLY) do |file|
file.each_line do |line|
line_match = ACCESSLOG_REGEXP.match(line)
unless line_match
$log.info "nomatch log found: #{line} in #{record_common['key']}"
next
end
record = {
"type" => line_match[:type],
"time" => line_match[:time],
"alb_fullname" => line_match[:alb_fullname],
"alb" => line_match[:alb],
"client" => line_match[:client],
"client_port" => line_match[:client_port],
"target" => line_match[:target],
"target_port" => line_match[:target_port],
"request_processing_time" => line_match[:request_processing_time].to_f,
"target_processing_time" => line_match[:target_processing_time].to_f,
"response_processing_time" => line_match[:response_processing_time].to_f,
"elb_status_code" => line_match[:elb_status_code],
"target_status_code" => line_match[:target_status_code],
"received_bytes" => line_match[:received_bytes].to_i,
"sent_bytes" => line_match[:sent_bytes].to_i,
"request_method" => line_match[:request_method],
"request_uri" => line_match[:request_uri],
"domain" => line_match[:domain],
"request_protocol" => line_match[:request_protocol],
"user_agent" => line_match[:user_agent],
"ssl_cipher" => line_match[:ssl_cipher],
"ssl_protocol" => line_match[:ssl_protocol],
"target_group_arn" => line_match[:target_group_arn],
"target_group_name" => line_match[:target_group_name],
"trace_id" => line_match[:trace_id]
}
router.emit(@tag, Fluent::Engine.now, record_common.merge(record))
end
end
rescue => e
$log.warn "error occurred: #{e.message}"
end
end
def run
@loop.run
end
class TimerWatcher < Coolio::TimerWatcher
def initialize(interval, repeat, &callback)
@callback = callback
on_timer # first call
super(interval, repeat)
end
def on_timer
@callback.call
end
end
end
@yomon8
Copy link
Author

yomon8 commented Dec 12, 2016

This code was forked from fluent-plugin-elb-log.
https://github.com/shinsaka/fluent-plugin-elb-log

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment