Skip to content

Instantly share code, notes, and snippets.

@hivefans
Forked from bash0C7/out_reloadable_copy.rb
Last active March 17, 2020 02:03
Show Gist options
  • Save hivefans/0141dd402a99aa31dbb9 to your computer and use it in GitHub Desktop.
Save hivefans/0141dd402a99aa31dbb9 to your computer and use it in GitHub Desktop.
|-|{"files":{"out_reloadable_copy.rb":{"env":"plain"}},"tag":"bigdata"}
#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
class ReloadableCopyOutput < MultiOutput
Plugin.register_output('reloadable_copy', self)
config_param :deep_copy, :bool, :default => false
config_param :my_config_path, :string
def initialize
super
@q = Queue.new
Signal.trap :INT do
$log.warn 'reloadable_copy: reload my config start'
shutdown
load_my_config
start
$log.warn 'reloadable_copy: reload my config end'
end
end
attr_reader :outputs
def configure(conf)
super
load_my_config
end
def start
@thread = Thread.new(&method(:run))
@outputs.each {|o|
o.start
}
rescue
$log.warn "raises exception: #{$!.class}, '#{$!.message}"
end
def shutdown
Thread.kill(@thread)
@outputs.each {|o|
o.shutdown
}
end
def emit(tag, es, chain)
param = OpenStruct.new
param.tag = tag
param.es = es
param.chain = chain
@q.push param
end
private
def load_my_config
@outputs = []
path = File.expand_path(@my_config_path)
my_conf = File.open(path) { |io|
fname = File.basename(path)
basepath = File.dirname(path)
if fname =~ /\.rb$/
require 'fluent/config/dsl'
Config::DSL::Parser.parse(io, File.join(basepath, fname))
else
Config.parse(io, fname, basepath, false)
end
}
my_conf.elements.select {|e|
e.name == 'store'
}.each {|e|
type = e['type']
unless type
raise ConfigError, "Missing 'type' parameter on <store> directive"
end
log.debug "adding store type=#{type.dump}"
output = Plugin.new_output(type)
output.configure(e)
@outputs << output
}
end
def run
loop do
param = @q.pop
tag = param.tag
es = param.es
chain = param.chain
begin
unless es.repeatable?
m = MultiEventStream.new
es.each {|time,record|
m.add(time, record)
}
es = m
end
if @deep_copy
chain = CopyOutputChain.new(@outputs, tag, es, chain)
else
chain = OutputChain.new(@outputs, tag, es, chain)
end
chain.next
rescue
$log.warn "raises exception: #{$!.class}, '#{$!.message}, #{param}'"
end
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment