Skip to content

Instantly share code, notes, and snippets.

@chobie
Created October 15, 2011 14:42
Show Gist options
  • Save chobie/1289660 to your computer and use it in GitHub Desktop.
Save chobie/1289660 to your computer and use it in GitHub Desktop.
in_memcached.rb
module Fluent
require 'pp'
# help me! i'm really ruby newbiee
require '/usr/local/lib/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluent-0.9.20/lib/fluent/plugin/memcache_parser'
class MemcachedInput < Input
Plugin.register_input('memcached', self)
def initialize
require 'socket'
@port = 11211
@bind = '0.0.0.0'
@body_size_limit = 32*1024*1024 # TODO default
end
def configure(conf)
@port = conf['port'] || @port
@port = @port.to_i
@bind = conf['bind'] || @bind
@server_type = conf['server_type'] || 'nonblocking'
@is_framed = conf['framed'].to_s != "false"
if body_size_limit = conf['body_size_limit']
@body_size_limit = Config.size_value(body_size_limit)
end
end
def start
@loop = Coolio::Loop.new
@lsock = listen
@loop.attach(@lsock)
@thread = Thread.new(&method(:run))
end
def listen
$log.debug "listening memcached on #{@bind}:#{@port}"
Coolio::TCPServer.new(@bind,@port, Handler,method(:on_message))
end
def shutdown
@lsock.close
@loop.stop
end
def run
@loop.run
rescue
$log.error "unexpected error", :error=>$!.to_s
$log.error_backtrace
end
def on_message(msg)
tag = msg[0].to_s
entries = msg[1]
$log.trace {tag}
$log.trace {entries}
end
class Handler < Coolio::Socket
def initialize(io, on_message)
super(io)
@nread = 0
@buffer = ""
@parser = Parser.new(io)
opt = [1, @timeout.to_i].pack('I!I!')
io.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, opt)
$log.trace { "accepted fluent socket object_id="}
@on_message = on_message
end
def on_connect
end
def on_read(data)
@buffer << data
@nread = @parser.execute(@buffer, @nread)
if @parser.finished?
@buffer = ""
@nread = 0
end
rescue
$log.trace {"client error: #{$!}"}
end
def on_close
end
end
class Parser < ::MemcacheParser::Parser
def initialize(io)
@io = io
super(self)
end
def memcache_get(keys)
#puts "get #{keys.inspect}"
end
def memcache_set(key, data, flags, exptime, noreply)
#puts "set #{key.inspect}:#{data.inspect}"
@io.write "STORED\r\n"
record = JSON.parse(data)
event = Event.new(Time.now.to_i, record)
Engine.emit(key, event)
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment