Skip to content

Instantly share code, notes, and snippets.

@yuki24
Created October 16, 2011 09:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yuki24/1290715 to your computer and use it in GitHub Desktop.
Save yuki24/1290715 to your computer and use it in GitHub Desktop.
fluent-plugin-redis 0.2.0 for fluentd 0.10.0
module Fluent
class RedisOutput < BufferedOutput
Fluent::Plugin.register_output('redis', self)
attr_reader :host, :port, :db_number, :redis
def initialize
super
require 'redis'
require 'msgpack'
end
def configure(conf)
super
@host = conf.has_key?('host') ? conf['host'] : 'localhost'
@port = conf.has_key?('port') ? conf['port'].to_i : 6379
@db_number = conf.has_key?('db_number') ? conf['db_number'].to_i : nil
if conf.has_key?('namespace')
$log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually."
end
end
def start
super
@redis = Redis.new(:host => @host, :port => @port,
:thread_safe => true, :db => @db_number)
end
def shutdown
@redis.quit
end
def format(tag, time, record)
identifier = [tag, time].join(".")
[identifier, record].to_msgpack
end
def write(chunk)
@redis.pipelined {
chunk.open { |io|
begin
MessagePack::Unpacker.new(io).each.each_with_index { |record, index|
@redis.mapped_hmset "#{record[0]}.#{index}", record[1]
}
rescue EOFError
# EOFError always occured when reached end of chunk.
end
}
}
end
end
end
require 'fluent/test'
require 'fluent/plugin/out_redis'
class FileOutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
@d = create_driver %[
host localhost
port 6379
db_number 1
]
@time = Time.parse("2011-01-02 13:14:15 UTC").to_i
end
def create_driver(conf = CONFIG)
Fluent::Test::BufferedOutputTestDriver.new(Fluent::RedisOutput).configure(conf)
end
def test_configure
assert_equal 'localhost', @d.instance.host
assert_equal 6379, @d.instance.port
assert_equal 1, @d.instance.db_number
end
def test_format
@d.emit({"a"=>1}, @time)
@d.expect_format(["test.#{@time}", {"a"=>1}].to_msgpack)
@d.run
end
def test_write
@d.emit({"a"=>2}, @time)
@d.emit({"a"=>3}, @time)
@d.run
assert_equal "2", @d.instance.redis.hget("test.#{@time}.0", "a")
assert_equal "3", @d.instance.redis.hget("test.#{@time}.1", "a")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment