Skip to content

Instantly share code, notes, and snippets.

@yteraoka
Last active December 13, 2015 20:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yteraoka/4967319 to your computer and use it in GitHub Desktop.
Save yteraoka/4967319 to your computer and use it in GitHub Desktop.
DBI を使って PostgreSQL でも MySQL にでも出力できる Fluentd Output Plugin https://github.com/yteraoka/fluent-plugin-dbi
# <match dbi.test>
# type dbi
# #dsn DBI:Pg:dbname:127.0.0.1
# dsn DBI:Mysql:dbname:127.0.0.1
# db_user username
# db_pass password
# keys host,time_m,method,uri,protocol,status
# query insert into access_log (host, time, method, uri, protocol, status) values (?, ?, ?, ?, ?, ?)
# </match>
module Fluent
class DbiOutput < BufferedOutput
Plugin.register_output('dbi', self)
config_param :dsn, :string
config_param :keys, :string
config_param :db_user, :string
config_param :db_pass, :string
config_param :query, :string
def initialize
super
require 'dbi'
end
def configure(conf)
super
@keys = @keys.split(",")
end
def format(tag, time, record)
[tag, time, record].to_msgpack
end
def write(chunk)
begin
dbh = DBI.connect(@dsn, @db_user, @db_pass)
dbh['AutoCommit'] = false
sth = dbh.prepare(@query)
chunk.msgpack_each { |tag, time, record|
record.key?('time') || record['time'] = time
record.key('tag') || record['tag'] = tag
values = []
@keys.each { |key|
values.push(record[key])
}
rows = sth.execute(*values)
}
rescue
dbh.rollback if dbh
raise
else
sth.finish
dbh.commit
ensure
dbh.disconnect if dbh
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment