Created
February 15, 2012 12:12
-
-
Save kei2100/1835316 to your computer and use it in GitHub Desktop.
Fluentd Redis out plugin sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class RedisOutput < Fluent::BufferedOutput | |
# Register plugin first. NAME is the name of this plugin | |
# which is used in the configuration file. | |
Fluent::Plugin.register_output('redis_sample', self) | |
def initialize | |
require 'msgpack' | |
require 'redis' | |
super | |
end | |
# This method is called before starting. | |
# 'conf' is a Hash that includes configuration parameters. | |
# If the configuration is invalid, raise Fluent::ConfigError. | |
def configure(conf) | |
super | |
end | |
# This method is called when starting. | |
# Open sockets or files here. | |
def start | |
super | |
@redis = Redis.new(:host => 'localhost', :port => 6379, :thread_safe => true, :db => nil) | |
end | |
# This method is called when shutting down. | |
# Shutdown the thread and close sockets or files here. | |
def shutdown | |
super | |
@redis.quit | |
end | |
# This method is called when an event is reached. | |
# Convert event to a raw string. | |
def format(tag, time, record) | |
record.to_msgpack | |
end | |
# This method is called every flush interval. write the buffer chunk | |
# to files or databases here. | |
# 'chunk' is a buffer chunk that includes multiple formatted | |
# events. You can use 'data = chunk.read' to get all events and | |
# 'chunk.open {|io| ... }' to get IO object. | |
def write(chunk) | |
@redis.pipelined { | |
chunk.msgpack_each {|record| | |
exampleId = record['exampleId'] | |
@redis.rpush(exampleId, record.to_msgpack) | |
} | |
} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment