Skip to content

Instantly share code, notes, and snippets.

@hiroyuki-sato
Last active August 29, 2015 14:14
Show Gist options
  • Save hiroyuki-sato/9ca2b0c21642c5681280 to your computer and use it in GitHub Desktop.
Save hiroyuki-sato/9ca2b0c21642c5681280 to your computer and use it in GitHub Desktop.
Embulk Plugin Output FluentLoggerRuby

embulk-plugin-output-fluent-logger-ruby

** embulk-0.4.0 ** 以降だと動かないと思います。

そのうちどなたかがjavaで実装しそうなので..

out:
  type: fluent_logger_ruby
  host: 192.168.xx.xx # default localhost 
  port: 24224         # default 24224
  tag: debug.test
require 'fluent-logger'
module Embulk
module Plugin
class FluentLoggerRuby < OutputPlugin
# output plugin file name must be: embulk/output_<name>.rb
Plugin.register_output('fluent_logger_ruby', self)
def self.transaction(config, schema, count, &control)
task = {
'host' => config.param('host', :string, default: "localhost"),
'port' => config.param('port', :integer, default: 24224),
'tag' => config.param('tag', :string)
}
resume(task, schema, count, &control)
end
def self.resume(task, schema, count, &control)
puts "FluentLoggerRuby output started."
commit_reports = yield(task)
puts "FluentLoggerRuby output finished. Commit reports = #{commit_reports.to_json}"
next_config_diff = {}
return next_config_diff
end
def initialize(task, schema, index)
puts "FluentLoggerRuby output thread #{index}..."
super
@tag = task['tag']
@logger = Fluent::Logger::FluentLogger.new(nil, :host => task['host'], :port => task['port'])
@records = 0
end
def close
end
def add(page)
page.each do |record|
hash = Hash[schema.names.zip(record)]
unless @logger.post(@tag, hash)
# TODO error check
end
@records += 1
end
end
def finish
end
def abort
end
def commit
commit_report = {
"records" => @records
}
return commit_report
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment