Skip to content

Instantly share code, notes, and snippets.

@sonots
Last active December 29, 2015 08:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sonots/7644438 to your computer and use it in GitHub Desktop.
Save sonots/7644438 to your computer and use it in GitHub Desktop.
--- /usr/lib64/fluent/ruby/lib/ruby/gems/1.9.1/gems/fluentd-0.10.39/lib/fluent/plugin/in_forward.rb 2013-09-25 05:23:14.000000000 +0900
+++ /etc/td-agent/plugin/in_forward.rb 2013-11-27 01:11:37.743605655 +0900
@@ -35,6 +35,7 @@
end
def start
+ @msgcount = 0
@loop = Coolio::Loop.new
@lsock = listen
@@ -47,6 +48,7 @@
@loop.attach(@hbr)
@thread = Thread.new(&method(:run))
+ @watcher = Thread.new(&method(:watch))
@cached_unpacker = $use_msgpack_5 ? nil : MessagePack::Unpacker.new
end
@@ -57,6 +59,7 @@
listen_address = (@bind == '0.0.0.0' ? '127.0.0.1' : @bind)
TCPSocket.open(listen_address, @port) {|sock| } # FIXME @thread.join blocks without this line
@thread.join
+ @watcher.join
@lsock.close
end
@@ -77,6 +80,15 @@
# Coolio::UNIXServer.new(@path, Handler, method(:on_message))
#end
+ def watch
+ start_time = Time.now.to_i
+ while true
+ sleep 1
+ $log.info "plugin:in_forward\tcount:#{@msgcount}\tunit:second" if @msgcount > 0
+ @msgcount = 0
+ end
+ end
+
def run
@loop.run
rescue
@@ -119,6 +131,10 @@
# PackedForward
es = MessagePackEventStream.new(entries, @cached_unpacker)
Engine.emit_stream(tag, es)
+ unpacker = @cached_unpacker || MessagePack::Unpacker.new
+ unpacker.feed_each(entries) do |entry|
+ @msgcount +=1
+ end
elsif entries.class == Array
# Forward
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment