Skip to content

Instantly share code, notes, and snippets.

@mikeda
Created August 17, 2012 05:45
Show Gist options
  • Save mikeda/d9f800c1a5aa73e125cb to your computer and use it in GitHub Desktop.
Save mikeda/d9f800c1a5aa73e125cb to your computer and use it in GitHub Desktop.
PostfixのmaillogをtailするFluentdプラグイン
#@config
#<source>
# type maillog_tail
# path /var/log/maillog
# tag maillog.test
#</source>
#
#@output
#2012-08-05T04:02:27+09:00 maillog.test {
# "qid":"AF06E10339D7",
# "from":"mikeda@mikeda.jp",
# "to":"oranie@oranie.jp",
# "orig_to":nill,
# "size":"114364",
# "relay":"local",
# "delay":"1.5",
# "status":"sent"
#}
module Fluent
class MaillogParser
# メール情報を保存するバッファの最大サイズと
# それを超えた時にクリアする要素の数
MAX_BUFFER_LENGTH = 1024
CLEAR_BUFFER_LENGTH = 512
def initialize
@records = {}
end
def parse(line)
m = @regexp.match(line)
return nil, nil unless m
time = Time.strptime(m['time'], @time_format).to_i
cmd = m['cmd']
message = m['message']
message.lstrip!
unless message =~ /^([^:]+): (.*)/
return nil,nil
end
qid, message = $~.captures
message.lstrip!
# エラー系はとりあえず無視
err_msgs = %w(warning fatal panic reject reject_warning hold discard)
if err_msgs.include?(qid)
return nil,nil
end
unless @records[qid]
@records[qid] = {
'time' => time,
'qid' => qid,
}
# @recordsのサイズがMAX_BUFFER_LENGTHを超えたら古いレコードを削除
# 効率悪そう
if @records.length > MAX_BUFFER_LENGTH
del_keys = @records.sort_by{|k,v| v['time']}.first(CLEAR_BUFFER_LENGTH).map{|r| r[0]}
del_keys.each{|k| @records.delete(k)}
$log.warn "delete record:#{del_keys}"
end
end
from_regexp = /^from=<([^>]*)>, size=(\d+)/
to_regexp = /^to=<([^>]*)>, (?:orig_to=<([^>]*)>, )?relay=([^,]+), (?:conn_use=[^,]+, )?delay=([^,]+), (?:delays=[^,]+, )?(?:dsn=[^,]+, )?status=(\S+)(.*)$/
case cmd
when "cleanup", "master", "smtpd"
# とりあえず無視する
return nil, nil
else
if from_regexp.match(message)
from, size = $~.captures
@records[qid]['from'] ||= from
@records[qid]['size'] ||= size
return nil, nil
elsif to_regexp.match(message)
return nil, nil unless @records[qid]
to, orig_to, relay, delay, status = $~.captures
record = @records.delete(qid)
record["to"] = to
record["orig_to"] = orig_to
record["relay"] = relay
record["delay"] = delay
record["status"] = status
time = record.delete('time')
return time, record
end
end
end
DEFALUT = {
'regexp' => /^(?<time>[^ ]*\s*[^ ]* [^ ]*) [^ ]* \w+\/(?<cmd>[^\[]+)\[\d+\]: (?<message>.*)$/,
'time_format' => "%b %d %H:%M:%S",
}
def configure(conf)
if format = conf['format']
begin
@regexp = Regexp.new(format[1..-2])
rescue
raise ConfigError, "Invalid regexp '#{format[1..-2]}': #{$!}"
end
else
@regexp = DEFALUT['regexp']
end
@time_format = conf['time_format'] || DEFALUT['time_format']
if @regexp.named_captures.empty?
raise "No named captures"
else
%w(time cmd message).each {|name|
unless @regexp.named_captures.key? name
raise "Not required named capture:<#{name}>"
end
}
end
return true
end
end
class MaillogTail < TailInput
Fluent::Plugin.register_input('maillog_tail', self)
def configure_parser(conf)
@parser = MaillogParser.new
@parser.configure(conf)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment