Skip to content

Instantly share code, notes, and snippets.

@shnjp
Created October 2, 2013 07:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shnjp/6790004 to your computer and use it in GitHub Desktop.
Save shnjp/6790004 to your computer and use it in GitHub Desktop.
class Fluent::UDPInput < Fluent::Input
Fluent::Plugin.register_input('udp_formatted', self)
config_param :port, :integer, :default => 5050
config_param :bind, :string, :default => '127.0.0.1'
config_param :tag, :string
def configure(conf)
super
@parser = Fluent::TextParser.new
@parser.configure(conf)
end
def start
@loop = Coolio::Loop.new
@sock = Fluent::SocketUtil.create_udp_socket(@bind)
@sock.bind(@bind, @port)
@sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
@watcher = UDPSocketWatcher.new(@sock, method(:on_receive))
@loop.attach(@watcher)
@thread = Thread.new(&method(:run))
end
def shutdown
@sock.close()
@loop.watchers.each {|w| w.detach }
@loop.stop
@thread.join
end
def run
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
class UDPSocketWatcher < Coolio::IO
def initialize(io, callback)
super(io)
@io = io
@callback = callback
end
def on_readable
begin
msg, addr = @io.recvfrom(5120)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR
return
end
host = addr[3]
port = addr[1]
@callback.call(host, port, msg)
rescue
# TODO log?
end
end
def on_receive(host, port, msg)
msg.chomp! # remove \n
time, record = @parser.parse(msg)
if time && record
Fluent::Engine.emit(@tag, time, record)
end
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment