Skip to content

Instantly share code, notes, and snippets.

@kei2100
Created February 15, 2012 12:12
Show Gist options
  • Save kei2100/1835316 to your computer and use it in GitHub Desktop.
Save kei2100/1835316 to your computer and use it in GitHub Desktop.
Fluentd Redis out plugin sample
class RedisOutput < Fluent::BufferedOutput
# Register plugin first. NAME is the name of this plugin
# which is used in the configuration file.
Fluent::Plugin.register_output('redis_sample', self)
def initialize
require 'msgpack'
require 'redis'
super
end
# This method is called before starting.
# 'conf' is a Hash that includes configuration parameters.
# If the configuration is invalid, raise Fluent::ConfigError.
def configure(conf)
super
end
# This method is called when starting.
# Open sockets or files here.
def start
super
@redis = Redis.new(:host => 'localhost', :port => 6379, :thread_safe => true, :db => nil)
end
# This method is called when shutting down.
# Shutdown the thread and close sockets or files here.
def shutdown
super
@redis.quit
end
# This method is called when an event is reached.
# Convert event to a raw string.
def format(tag, time, record)
record.to_msgpack
end
# This method is called every flush interval. write the buffer chunk
# to files or databases here.
# 'chunk' is a buffer chunk that includes multiple formatted
# events. You can use 'data = chunk.read' to get all events and
# 'chunk.open {|io| ... }' to get IO object.
def write(chunk)
@redis.pipelined {
chunk.msgpack_each {|record|
exampleId = record['exampleId']
@redis.rpush(exampleId, record.to_msgpack)
}
}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment