Last active
August 29, 2015 14:05
-
-
Save repeatedly/0ed3e2b4016b4045640a to your computer and use it in GitHub Desktop.
Filter example1: grep filter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Fluent::GrepFilter < Fluent::Filter | |
Fluent::Plugin.register_filter('grep', self) | |
REGEXP_MAX_NUM = 20 | |
config_param :input_key, :string, :default => nil # obsolete | |
config_param :regexp, :string, :default => nil # obsolete | |
config_param :exclude, :string, :default => nil # obsolete | |
config_param :replace_invalid_sequence, :bool, :default => false | |
(1..REGEXP_MAX_NUM).each {|i| config_param :"regexp#{i}", :string, :default => nil } | |
(1..REGEXP_MAX_NUM).each {|i| config_param :"exclude#{i}", :string, :default => nil } | |
# for test | |
attr_reader :regexps | |
attr_reader :excludes | |
# To support log_level option implemented by Fluentd v0.10.43 | |
unless method_defined?(:log) | |
define_method("log") { $log } | |
end | |
def configure(conf) | |
super | |
@regexps = {} | |
@regexps[@input_key] = Regexp.compile(@regexp) if @input_key and @regexp | |
(1..REGEXP_MAX_NUM).each do |i| | |
next unless conf["regexp#{i}"] | |
key, regexp = conf["regexp#{i}"].split(/ /, 2) | |
raise Fluent::ConfigError, "regexp#{i} does not contain 2 parameters" unless regexp | |
raise Fluent::ConfigError, "regexp#{i} contains a duplicated key, #{key}" if @regexps[key] | |
@regexps[key] = Regexp.compile(regexp) | |
end | |
@excludes = {} | |
@excludes[@input_key] = Regexp.compile(@exclude) if @input_key and @exclude | |
(1..REGEXP_MAX_NUM).each do |i| | |
next unless conf["exclude#{i}"] | |
key, exclude = conf["exclude#{i}"].split(/ /, 2) | |
raise Fluent::ConfigError, "exclude#{i} does not contain 2 parameters" unless exclude | |
raise Fluent::ConfigError, "exclude#{i} contains a duplicated key, #{key}" if @excludes[key] | |
@excludes[key] = Regexp.compile(exclude) | |
end | |
end | |
def filter_stream(tag, es) | |
result_es = Fluent::MultiEventStream.new | |
es.each do |time, record| | |
catch(:break_loop) do | |
@regexps.each do |key, regexp| | |
throw :break_loop unless match(regexp, record[key].to_s) | |
end | |
@excludes.each do |key, exclude| | |
throw :break_loop if match(exclude, record[key].to_s) | |
end | |
result_es.add(time, record) | |
end | |
end | |
result_es | |
rescue => e | |
log.warn e.message | |
log.warn e.backtrace.join(', ') | |
end | |
private | |
def match(regexp, string) | |
begin | |
return regexp.match(string) | |
rescue ArgumentError => e | |
raise e unless e.message.index("invalid byte sequence in") == 0 | |
string = replace_invalid_byte(string) | |
retry | |
end | |
return true | |
end | |
def replace_invalid_byte(string) | |
replace_options = { invalid: :replace, undef: :replace, replace: '?' } | |
original_encoding = string.encoding | |
temporal_encoding = (original_encoding == Encoding::UTF_8 ? Encoding::UTF_16BE : Encoding::UTF_8) | |
string.encode(temporal_encoding, original_encoding, replace_options).encode(original_encoding) | |
end | |
end if defined?(Fluent::Filter) # Avoid 'uninitialized constant Fluent::Filter' at Fluentd v0.10 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment