Skip to content

Instantly share code, notes, and snippets.

@repeatedly
Last active August 29, 2015 14:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save repeatedly/0ed3e2b4016b4045640a to your computer and use it in GitHub Desktop.
Save repeatedly/0ed3e2b4016b4045640a to your computer and use it in GitHub Desktop.
Filter example1: grep filter
class Fluent::GrepFilter < Fluent::Filter
Fluent::Plugin.register_filter('grep', self)
REGEXP_MAX_NUM = 20
config_param :input_key, :string, :default => nil # obsolete
config_param :regexp, :string, :default => nil # obsolete
config_param :exclude, :string, :default => nil # obsolete
config_param :replace_invalid_sequence, :bool, :default => false
(1..REGEXP_MAX_NUM).each {|i| config_param :"regexp#{i}", :string, :default => nil }
(1..REGEXP_MAX_NUM).each {|i| config_param :"exclude#{i}", :string, :default => nil }
# for test
attr_reader :regexps
attr_reader :excludes
# To support log_level option implemented by Fluentd v0.10.43
unless method_defined?(:log)
define_method("log") { $log }
end
def configure(conf)
super
@regexps = {}
@regexps[@input_key] = Regexp.compile(@regexp) if @input_key and @regexp
(1..REGEXP_MAX_NUM).each do |i|
next unless conf["regexp#{i}"]
key, regexp = conf["regexp#{i}"].split(/ /, 2)
raise Fluent::ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp
raise Fluent::ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @regexps[key]
@regexps[key] = Regexp.compile(regexp)
end
@excludes = {}
@excludes[@input_key] = Regexp.compile(@exclude) if @input_key and @exclude
(1..REGEXP_MAX_NUM).each do |i|
next unless conf["exclude#{i}"]
key, exclude = conf["exclude#{i}"].split(/ /, 2)
raise Fluent::ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude
raise Fluent::ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @excludes[key]
@excludes[key] = Regexp.compile(exclude)
end
end
def filter_stream(tag, es)
result_es = Fluent::MultiEventStream.new
es.each do |time, record|
catch(:break_loop) do
@regexps.each do |key, regexp|
throw :break_loop unless match(regexp, record[key].to_s)
end
@excludes.each do |key, exclude|
throw :break_loop if match(exclude, record[key].to_s)
end
result_es.add(time, record)
end
end
result_es
rescue => e
log.warn e.message
log.warn e.backtrace.join(', ')
end
private
def match(regexp, string)
begin
return regexp.match(string)
rescue ArgumentError => e
raise e unless e.message.index("invalid byte sequence in") == 0
string = replace_invalid_byte(string)
retry
end
return true
end
def replace_invalid_byte(string)
replace_options = { invalid: :replace, undef: :replace, replace: '?' }
original_encoding = string.encoding
temporal_encoding = (original_encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8)
string.encode(temporal_encoding, original_encoding, replace_options).encode(original_encoding)
end
end if defined?(Fluent::Filter) # Avoid 'uninitialized constant Fluent::Filter' at Fluentd v0.10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment