Skip to content

Instantly share code, notes, and snippets.

@kiyoto
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kiyoto/54305ed8cdb8ff916cbf to your computer and use it in GitHub Desktop.
Save kiyoto/54305ed8cdb8ff916cbf to your computer and use it in GitHub Desktop.
module Fluent
class IfFilter < Filter
Fluent::Plugin.register_filter('if', self)
config_param :if, :string
def configure(conf)
super
@placeholder_expander = RubyPlaceholderExpander.new(log)
then_conf = conf.elements.find { |element| element.name == 'then' }
unless then_conf
raise ConfigError, "<then> directive is required"
end
unless then_conf.has_key?("@type")
raise ConfigError, "<then> directive requires @type"
end
@then_filter = Plugin.new_filter(then_conf["@type"])
@then_filter.router = router
@then_filter.configure(then_conf)
else_conf = conf.elements.find { |element| element.name == 'else' }
@else_filter = nil
if else_conf
unless else_conf.has_key?("@type")
raise ConfigError, "<else> directive requires @type"
end
@else_filter = Plugin.new_filter(else_conf["@type"])
@else_filter.router = router
@else_filter.configure(else_conf)
end
end
def filter_stream(tag, es)
then_events = []
else_events = []
es_markers = []
es.each do |time, record|
@placeholder_expander.prepare_placeholders(time, record)
cond = @placeholder_expander.expand(@if)
log.info "cond is #{cond} with type = #{cond.class}"
log.info "if #{@if}"
if cond
then_events.push([time, record])
es_markers << :then
else
else_events.push([time, record])
es_markers << :else
end
end
then_events = @then_filter.filter_stream(tag, ArrayEventStream.new(then_events)).to_a
if @else_filter
else_events = @else_filter.filter_stream(tag, ArrayEventStream.new(else_events)).to_a
end
new_es = MultiEventStream.new
es_markers.each do |marker|
case marker
when :then
time, record = then_events.pop
new_es.add(time, record)
when :else
time, record = else_events.pop
new_es.add(time, record)
end
end
new_es
end
# Heavily inspired by record_transformer
class RubyPlaceholderExpander
attr_reader :placeholders, :log
def initialize(log)
@log = log
end
# Get placeholders as a struct
#
# @param [Time] time the time
# @param [Hash] record the record
def prepare_placeholders(time, record)
struct = UndefOpenStruct.new(record)
struct.time = Time.at(time)
@placeholders = struct
end
# Replace placeholders in a string
#
# @param [String] str the string to be replaced
def expand(str)
str.gsub!(/\$\{([^}]+)\}/, '\1')
begin
eval str, @placeholders.instance_eval { binding }
rescue => e
log.warn "failed to expand `#{str}`", :error_class => e.class, :error => e.message
log.warn_backtrace
nil
end
end
class UndefOpenStruct < OpenStruct
(Object.instance_methods).each do |m|
undef_method m unless m.to_s =~ /^__|respond_to_missing\?|object_id|public_methods|instance_eval|method_missing|define_singleton_method|respond_to\?|new_ostruct_member/
end
end
end
end
end
<source>
@type dummy
tag test.hello.world
</source>
<filter test.**>
@type if
if ${Random.rand > 0.5}
<then>
@type record_transformer
<record>
__branch then
</record>
</then>
<else>
@type record_transformer
<record>
__branch else
</record>
</else>
</filter>
<match test.**>
type stdout
</match>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment