Skip to content

Instantly share code, notes, and snippets.

@kiyoto
Created March 14, 2014 07:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kiyoto/9543713 to your computer and use it in GitHub Desktop.
Save kiyoto/9543713 to your computer and use it in GitHub Desktop.
Heroku HTTP logdrain input plugin for Fluentd
module Fluent
class HerokuHttpInput < Input
Plugin.register_input('heroku_http', self)
include DetachMultiProcessMixin
require 'http/parser'
def initialize
require 'webrick/httputils'
super
end
config_param :port, :integer, :default => 9880
config_param :bind, :string, :default => '0.0.0.0'
config_param :body_size_limit, :size, :default => 32*1024*1024 # TODO default
config_param :keepalive_timeout, :time, :default => 10 # TODO default
config_param :backlog, :integer, :default => nil
# c.f. https://github.com/heroku/logplex/blob/master/doc/README.http_drains.md
HEROKU_LOGPLEX_REGEXP = /^\d+ \<(?<pri>[0-9]+)\>(?<pri_version>[1-9][0-9]{0,2}) (?<time>[^ ]+) (?<host>[^ ]*) (?<message>.*)$/
def configure(conf)
super
# TODO: can it have microseconds?
@parser = TextParser::RegexpParser.new(HEROKU_LOGPLEX_REGEXP, "time_format" => "%Y-%m-%dT%H:%M:%S%z")
end
class KeepaliveManager < Coolio::TimerWatcher
class TimerValue
def initialize
@value = 0
end
attr_accessor :value
end
def initialize(timeout)
super(1, true)
@cons = {}
@timeout = timeout.to_i
end
def add(sock)
@cons[sock] = sock
end
def delete(sock)
@cons.delete(sock)
end
def on_timer
@cons.each_pair {|sock,val|
if sock.step_idle > @timeout
sock.close
end
}
end
end
def start
log.debug "listening http on #{@bind}:#{@port}"
lsock = TCPServer.new(@bind, @port)
detach_multi_process do
super
@km = KeepaliveManager.new(@keepalive_timeout)
@lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), @body_size_limit, log)
@lsock.listen(@backlog) unless @backlog.nil?
@loop = Coolio::Loop.new
@loop.attach(@km)
@loop.attach(@lsock)
@thread = Thread.new(&method(:run))
end
end
def shutdown
@loop.watchers.each {|w| w.detach }
@loop.stop
@lsock.close
@thread.join
end
def run
@loop.run
rescue
log.error "unexpected error", :error=>$!.to_s
log.error_backtrace
end
def on_request(path_info, params, body)
path = path_info[1..-1] # remove /
tag = path.split('/').join('.')
logplex_msg_count = params['HTTP_LOGPLEX_MSG_COUNT'].to_i
body.split("\n").each { |msg|
next if msg == ""
time, record = @parser.call(msg)
logplex_msg_count -= 1
begin
Engine.emit(tag, time, record)
rescue
return ["500 Internal Server Error", {'Content-type'=>'text/plain'}, "500 Internal Server Error\n#{$!}\n"]
end
}
if logplex_msg_count != 0
log.warn "Logplex-Msg-Count and body did not match"
return ["400 Bad Request", {'Content-type'=>'text/plain'}, "Logplex-Msg-Count and body did not match"]
end
return ["200 OK", {'Content-type'=>'text/plain'}, ""]
end
class Handler < Coolio::Socket
def initialize(io, km, callback, body_size_limit, log)
super(io)
@km = km
@callback = callback
@body_size_limit = body_size_limit
@content_type = ""
@logplex_msg_count = nil
@next_close = false
@log = log
@idle = 0
@km.add(self)
@remote_port, @remote_addr = *Socket.unpack_sockaddr_in(io.getpeername) rescue nil
end
def step_idle
@idle += 1
end
def on_close
@km.delete(self)
end
def on_connect
@parser = Http::Parser.new(self)
end
def on_read(data)
@idle = 0
@parser << data
rescue
@log.warn "unexpected error", :error=>$!.to_s
@log.warn_backtrace
close
end
def on_message_begin
@body = ''
end
def on_headers_complete(headers)
expect = nil
size = nil
if @parser.http_version == [1, 1]
@keep_alive = true
else
@keep_alive = false
end
@env = {}
headers.each_pair {|k,v|
@env["HTTP_#{k.gsub('-','_').upcase}"] = v
case k
when /Expect/i
expect = v
when /Content-Length/i
size = v.to_i
when /Content-Type/i
@content_type = v
when /Connection/i
if v =~ /close/i
@keep_alive = false
elsif v =~ /Keep-alive/i
@keep_alive = true
end
end
}
if expect
if expect == '100-continue'
if !size || size < @body_size_limit
send_response_nobody("100 Continue", {})
else
send_response_and_close("413 Request Entity Too Large", {}, "Too large")
end
else
send_response_and_close("417 Expectation Failed", {}, "")
end
end
end
def on_body(chunk)
if @body.bytesize + chunk.bytesize > @body_size_limit
unless closing?
send_response_and_close("413 Request Entity Too Large", {}, "Too large")
end
return
end
@body << chunk
end
def on_message_complete
return if closing?
@env['REMOTE_ADDR'] = @remote_addr if @remote_addr
params = WEBrick::HTTPUtils.parse_query(@parser.query_string)
if @content_type !~ /^application\/logplex/
@log.warn "request with non-Logplex content type:#{@content_type}"
return
end
path_info = @parser.request_path
params.merge!(@env)
@env.clear
if not params['HTTP_LOGPLEX_MSG_COUNT']
@log.warn "request without Logplex-Msg-Count"
return
end
code, header, body = *@callback.call(path_info, params, @body)
body = body.to_s
if @keep_alive
header['Connection'] = 'Keep-Alive'
send_response(code, header, body)
else
send_response_and_close(code, header, body)
end
end
def on_write_complete
close if @next_close
end
def send_response_and_close(code, header, body)
send_response(code, header, body)
@next_close = true
end
def closing?
@next_close
end
def send_response(code, header, body)
header['Content-length'] ||= body.bytesize
header['Content-type'] ||= 'text/plain'
data = %[HTTP/1.1 #{code}\r\n]
header.each_pair {|k,v|
data << "#{k}: #{v}\r\n"
}
data << "\r\n"
write data
write body
end
def send_response_nobody(code, header)
data = %[HTTP/1.1 #{code}\r\n]
header.each_pair {|k,v|
data << "#{k}: #{v}\r\n"
}
data << "\r\n"
write data
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment