Skip to content

Instantly share code, notes, and snippets.

@repeatedly
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save repeatedly/5c9f89e14aace5cda195 to your computer and use it in GitHub Desktop.
Save repeatedly/5c9f89e14aace5cda195 to your computer and use it in GitHub Desktop.
Filter example3: record_reformer
require 'socket'
require 'ostruct'
module Fluent
class RecordReformerFilter < Filter
Fluent::Plugin.register_filter('record_reformer', self)
def initialize
super
end
config_param :remove_keys, :string, :default => nil
config_param :keep_keys, :string, :default => nil
config_param :renew_record, :bool, :default => false
config_param :enable_ruby, :bool, :default => true # true for lower version compatibility
BUILTIN_CONFIGURATIONS = %W(type tag output_tag remove_keys renew_record keep_keys enable_ruby)
# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
define_method("log") { $log }
end
def configure(conf)
super
@map = {}
conf.each_pair { |k, v|
next if BUILTIN_CONFIGURATIONS.include?(k)
conf.has_key?(k) # to suppress unread configuration warning
@map[k] = v
}
# <record></record> directive
conf.elements.select { |element| element.name == 'record' }.each { |element|
element.each_pair { |k, v|
element.has_key?(k) # to suppress unread configuration warning
@map[k] = v
}
}
if @remove_keys
@remove_keys = @remove_keys.split(',')
end
if @keep_keys
raise Fluent::ConfigError, "out_record_reformer: `renew_record` must be true to use `keep_keys`" unless @renew_record
@keep_keys = @keep_keys.split(',')
end
@placeholder_expander =
if @enable_ruby
# require utilities which would be used in ruby placeholders
require 'pathname'
require 'uri'
require 'cgi'
RubyPlaceholderExpander.new(log)
else
PlaceholderExpander.new(log)
end
@hostname = Socket.gethostname
end
def filter_stream(tag, es)
new_es = MultiEventStream.new
tag_parts = tag.split('.')
tag_prefix = tag_prefix(tag_parts)
tag_suffix = tag_suffix(tag_parts)
placeholders = {
'tag' => tag,
'tags' => tag_parts,
'tag_parts' => tag_parts,
'tag_prefix' => tag_prefix,
'tag_suffix' => tag_suffix,
'hostname' => @hostname,
}
last_record = nil
es.each {|time, record|
last_record = record # for debug log
new_record = reform(@tag, time, record, placeholders)
new_es.add(time, new_record)
#router.emit(new_tag, time, new_record)
}
#chain.next
new_es
rescue => e
log.warn "record_reformer: #{e.class} #{e.message} #{e.backtrace.first}"
log.debug "record_reformer: tag:#{@tag} map:#{@map} record:#{last_record} placeholders:#{placeholders}"
end
private
def reform(tag, time, record, opts)
@placeholder_expander.prepare_placeholders(time, record, opts)
#new_tag = @placeholder_expander.expand(tag)
new_record = @renew_record ? {} : record.dup
@keep_keys.each {|k| new_record[k] = record[k]} if @keep_keys and @renew_record
@map.each_pair {|k, v| new_record[k] = @placeholder_expander.expand(v) }
@remove_keys.each {|k| new_record.delete(k) } if @remove_keys
new_record
end
def tag_prefix(tag_parts)
return [] if tag_parts.empty?
tag_prefix = [tag_parts.first]
1.upto(tag_parts.size-1).each do |i|
tag_prefix[i] = "#{tag_prefix[i-1]}.#{tag_parts[i]}"
end
tag_prefix
end
def tag_suffix(tag_parts)
return [] if tag_parts.empty?
rev_tag_parts = tag_parts.reverse
rev_tag_suffix = [rev_tag_parts.first]
1.upto(tag_parts.size-1).each do |i|
rev_tag_suffix[i] = "#{rev_tag_parts[i]}.#{rev_tag_suffix[i-1]}"
end
rev_tag_suffix.reverse
end
class PlaceholderExpander
attr_reader :placeholders, :log
def initialize(log)
@log = log
end
def prepare_placeholders(time, record, opts)
placeholders = { '${time}' => Time.at(time).to_s }
record.each {|key, value| placeholders.store("${#{key}}", value) }
opts.each do |key, value|
if value.kind_of?(Array) # tag_parts, etc
size = value.size
value.each_with_index { |v, idx|
placeholders.store("${#{key}[#{idx}]}", v)
placeholders.store("${#{key}[#{idx-size}]}", v) # support [-1]
}
else # string, interger, float, and others?
placeholders.store("${#{key}}", value)
end
end
@placeholders = placeholders
end
def expand(str)
str.gsub(/(\${[a-z_]+(\[-?[0-9]+\])?}|__[A-Z_]+__)/) {
log.warn "record_reformer: unknown placeholder `#{$1}` found" unless @placeholders.include?($1)
@placeholders[$1]
}
end
end
class RubyPlaceholderExpander
attr_reader :placeholders, :log
def initialize(log)
@log = log
end
# Get placeholders as a struct
#
# @param [Time] time the time
# @param [Hash] record the record
# @param [Hash] opts others
def prepare_placeholders(time, record, opts)
struct = UndefOpenStruct.new(record)
struct.time = Time.at(time)
opts.each {|key, value| struct.__send__("#{key}=", value) }
@placeholders = struct
end
# Replace placeholders in a string
#
# @param [String] str the string to be replaced
def expand(str)
str = str.gsub(/\$\{([^}]+)\}/, '#{\1}') # ${..} => #{..}
eval "\"#{str}\"", @placeholders.instance_eval { binding }
end
class UndefOpenStruct < OpenStruct
(Object.instance_methods).each do |m|
undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/
end
end
end
end if defined?(Filter) # Avoid 'uninitialized constant Fluent::Filter' at Fluentd v0.10
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment