Skip to content

Instantly share code, notes, and snippets.

@holysugar
Created September 11, 2015 08:21
Show Gist options
  • Save holysugar/b4d4717af86b34d2f7e0 to your computer and use it in GitHub Desktop.
Save holysugar/b4d4717af86b34d2f7e0 to your computer and use it in GitHub Desktop.
BigQuery のスキーマファイルを動的に出力する fluentd のフィルタプラグイン (bigqueryプラグインとセットで)
require 'fluent/filter'
require 'json'
class Fluent::BqSchemaFilter < Fluent::Filter
Fluent::Plugin.register_filter('bq_schema', self)
config_param :key_name, :string, default: 'op'
config_param :schema_dir, :string, default: '/tmp/td-agent/'
config_param :field_string, :array, default: nil
config_param :field_integer, :array, default: nil
config_param :field_float, :array, default: nil
config_param :field_boolean, :array, default: nil
config_param :field_timestamp, :array, default: nil
def initialize
super
@cache = {}
@typed_fields = {}
end
def start
FileUtils.mkdir_p @schema_dir
%w(string integer float boolean timestamp).each do |type|
raw_fields = instance_variable_get("@field_#{type}")
next unless raw_fields
raw_fields.each do |field|
@typed_fields[field] = type.upcase
end
end
end
def filter(tag, time, record)
key = record[@key_name]
unless @cache[key]
structure = create_structure_from_record(record)
File.write("#{schema_dir}/bq_schema_#{key}.json", JSON.pretty_generate(structure))
@cache[key] = structure
end
record
end
private
def create_structure_from_record(record)
record.map do |(k,v)|
type = \
@typed_fields[k] ||
case v
when /\A[0-9]+\z/; "INTEGER"
when String; "STRING"
when Integer; "INTEGER"
when Float; "FLOAT"
when Time; "TIMESTAMP"
else; "STRING"
end
{
"name" => k,
"type" => type
}
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment