Skip to content

Instantly share code, notes, and snippets.

@oza
Created September 28, 2011 08:14
Show Gist options
  • Save oza/1247325 to your computer and use it in GitHub Desktop.
Save oza/1247325 to your computer and use it in GitHub Desktop.
The fluent plugin for hbase.
module Fluent
class HbaseOutput < Fluent::BufferedOutput
Fluent::Plugin.register_output('hbase', self)
def initialize
super
require 'rubygems'
require 'stargate'
end
def configure(conf)
super
if host = conf['host']
@host = host
unless @host
raise ConfigError, "'host' parameter is required on file output"
end
if table = conf['table']
@table = table
end
unless @table
raise ConfigError, "'hbase_table' parameter is required on file output"
end
if @localtime
@formatter = Proc.new {|tag,event|
"#{Time.at(event.time).iso8601}\t#{tag}\t#{event.record.to_json}\n"
}
else
@formatter = Proc.new {|tag,event|
"#{Time.at(event.time).utc.iso8601}\t#{tag}\t#{event.record.to_json}\n"
}
end
end
def start
super
# FIXME : authentication may be required.
@client = Stargate::Client.new(@host)
end
def shutdown
@client.close
end
def format(tag, event)
@formatter.call(tag, event)
end
def write(chunk)
# TODO : Error handling
records = []
chunk.split('\n') { |element|
key, cf, json_record = element.split('\t')
p json_record
record = JSON.parse(json_val).each { |k, v|
k = cf + ":" + k
}
p record
@client.create_row(@table, key, Time.now.to_i, record)
}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment