Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Last active December 20, 2015 20:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frsyuki/6191818 to your computer and use it in GitHub Desktop.
Save frsyuki/6191818 to your computer and use it in GitHub Desktop.

Fluentdプラグインのv10→v11移植ガイド

概要

Fluentd v11は、v10とプラグインAPIに非互換な部分があります。 ふつうのプラグインなら5ステップで移植できます。

1. 名前空間の変更

module Fluent 名前空間を module Fluentd; module Plugin に変更してください。

Before:

module Fluent
  class MyPlugin .
  end
end

After:

module Fluentd
  module Plugin
    class MyPlugin 
    end
  end
end

2. Engine.emit → collector.emit に変更(Inputプラグインのみ)

これが一番大きな変更です。 Engine.emit(tag, time, record) を、collector.emit(tag, time, record) に変更してください。 collector は、Input クラスのメソッドです。 Output を継承したクラスでは使えないので注意してください。

Engine.emit_stream(tag, es) は、collector.emits(tag, es) に変更してください。

4. $log → Fluentd.log

$logFluentd.log に変更してください。 ロガーのインタフェースに変更はありません。

5. gemパッケージ名の変更

fluent-plugin-XXX を、fluentd-plugin-XXX に変更してください。 gemの依存関係には、gem.add_dependency 'fluentd', ['~> 0.11.0'] を追加してください。

※注意!:まだ fluentd-0.11.0 はリリースされていません。gemはまだリリースしないで良いです

そのほか

Filterプラグイン

inputとoutputを両方行うプラグインは、filterプラグインに書き直すのがおすすめです。 Filter クラスを継承してください。

actor の推奨

以下のケースでは、新API actor を使うと良いです:

  • TCPやUNIXソケットをlistenする
  • 定期的に何かを実行する(タイマー)

def start メソッドの中で actor に対して各種のハンドラを登録すると、終了時の制御や例外制御、エラーロギングなどを自動的にこなしてくれます。

TCP/UNIXソケットサーバ

actor.create_tcp_thread_server(bind, port) {|sock| … } または actor.create_unix_thread_server(path) {|sock| … } で、クライアント毎にスレッドを立ち上げるタイプのサーバを作成できます:

config_param :bind, :string, :default => '0.0.0.0'
config_param :port, :integer, :default => 9191

def start
  actor.create_tcp_thread_server(@bind, @port, &method(:client_thread))
end

def client_thread(sock)
  msg = sock.read
  sock.write msg
ensure
  sock.close
end

タイマー

actor.every(interval) { … ] で、ハンドラを定期的に実行できます:

config_param :interval, :time, :default => 0.5

def start
  actor.every @interval do
    collector.emit("ping", Time.now.to_i, {"ping"=>1}
  end
end

未実装部分

  • TimeSlicedOutput は未実装
  • buf_file は未実装
  • secondary は未実装

Fluentd v11 β のテスト方法

  1. git clone https://github.com/fluent/fluentd.git && cd fluentd
  2. git checkout v11
  3. vi test.conf で設定ファイルを作成:
<worker>
  <source>
    type heartbeat
    tag "test.hb"
    message {"test":"heartbeat"}
  </source>

  <match **>
    type stdout
  </match>

  <label ERROR>
    <filter **>
      type retag
      tag error
    </filter>

    <match **>
      type stdout
    </match>
  </label>
</worker>
  1. ruby -Ipath/to/extra/lib bin/fluentd -c test.conf
@bash0C7
Copy link

bash0C7 commented Aug 10, 2013

config_paramの値

v11の設定ファイルでは、値がJSON parseされてpluginに引き渡る。
そのため、プラグイン側でJSON parseしている部分を取り除く必要がある

-          @dummydata.push(JSON.parse(conf[key]))
+          @dummydata.push(conf[key])

v10

{"type"=>"dummydata_producer",
 "tag"=>"dummy.data",
 "rate"=>"500",
 "dummydata0"=>
  "{\"type\":\"sample\",\"code\":50,\"format\":\"json string allowed\"}",
 "dummydata1"=>"{\"message\":\"other format needed?\"}",
 "dummydata2"=>"{\"comment\":\"N of dummydataN is number and not limited\"}"}

v11

{"type"=>"dummydata_producer",
 "tag"=>"dummy.data",
 "rate"=>500,
 "dummydata0"=>{"type"=>"sample", "code"=>50, "format"=>"json string allowed"},
 "dummydata1"=>{"message"=>"other format needed?"},
 "dummydata2"=>{"comment"=>"N of dummydataN is number and not limited"}}
"dummydata0"
{"type"=>"sample", "code"=>50, "format"=>"json string allowed"}

Fluent::Engineが無い

そのため、Fluent::Engine.nowにて時間をとっている部分を改修する必要がある。
v10のFluent::Engine.nowはTime.now.to_iを返していたので、これに置き換える。

バックグラウンドでずっと能動的に動かしておきたいものactor#backgroundを使うといいらしい!

(まだ試してません)

@sonots
Copy link

sonots commented Aug 10, 2013

補足情報として、Fluentdプラグインのv10→v11移植ガイド (filterぷらぎん編) 書きました。
https://gist.github.com/sonots/6199142

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment