** embulk-0.4.0 ** 以降だと動かないと思います。
そのうちどなたかがjavaで実装しそうなので..
out:
type: fluent_logger_ruby
host: 192.168.xx.xx # default localhost
port: 24224 # default 24224
tag: debug.test
require 'fluent-logger' | |
module Embulk | |
module Plugin | |
class FluentLoggerRuby < OutputPlugin | |
# output plugin file name must be: embulk/output_<name>.rb | |
Plugin.register_output('fluent_logger_ruby', self) | |
def self.transaction(config, schema, count, &control) | |
task = { | |
'host' => config.param('host', :string, default: "localhost"), | |
'port' => config.param('port', :integer, default: 24224), | |
'tag' => config.param('tag', :string) | |
} | |
resume(task, schema, count, &control) | |
end | |
def self.resume(task, schema, count, &control) | |
puts "FluentLoggerRuby output started." | |
commit_reports = yield(task) | |
puts "FluentLoggerRuby output finished. Commit reports = #{commit_reports.to_json}" | |
next_config_diff = {} | |
return next_config_diff | |
end | |
def initialize(task, schema, index) | |
puts "FluentLoggerRuby output thread #{index}..." | |
super | |
@tag = task['tag'] | |
@logger = Fluent::Logger::FluentLogger.new(nil, :host => task['host'], :port => task['port']) | |
@records = 0 | |
end | |
def close | |
end | |
def add(page) | |
page.each do |record| | |
hash = Hash[schema.names.zip(record)] | |
unless @logger.post(@tag, hash) | |
# TODO error check | |
end | |
@records += 1 | |
end | |
end | |
def finish | |
end | |
def abort | |
end | |
def commit | |
commit_report = { | |
"records" => @records | |
} | |
return commit_report | |
end | |
end | |
end | |
end |