Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Created May 8, 2012 19:31
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save frsyuki/2638703 to your computer and use it in GitHub Desktop.
Save frsyuki/2638703 to your computer and use it in GitHub Desktop.
Fluentd v0.11 の設計案

Fluentd v11 設計案

We made it possible. Next, we'll make it beautiful.

Suffering-oriented programming

コンセプト

  • 柔軟性向上
    • 予想以上に複雑な使い方をしているケースが多かったので、設定ファイルも複雑化して対応する
    • remove_tag_prefix とか add_tag_prefix の乱立をなんとかする
  • 信頼性向上
    • リトライしてもしょうがないエラーと、そうでないエラーを区別する
    • エラー処理を非同期化する
    • listenしているソケットを閉じずに再起動できるようにする
    • 到達保証(at-least-once)をサポート
  • 性能強化
    • マルチコア環境下でのスケーラビリティを向上
  • 互換性維持
    • これ重要

1. 内部ルーティングラベルの導入

背景

ログを加工するときは、out_exec_filterなどのプラグインで加工した後のログを、もう一度別のプラグインにemitして保存したり転送したりすることになる。ここで再emitするときに、tagを変えないと、また加工用のプラグインにマッチしてしまう。そこで、フィルタリング系のプラグインには、add_tag_prefix といった設定オプションがある。

ところが、ストレージにtagも保存しておきたいときに問題が発生する。tagにはFluentdで処理する都合上付けられたprefixが付いる。この邪魔なprefixを消すために、output pluginには remove_tag_prefix といった設定オプションがあったりする。

このように、Fluentd内部でルーティングに使用するtag(意味の無いtag)と、外部から受け取って/読み取ってストレージに保存するtag(意味のあるtag)が、分離されていない。

設計

Fluentd内部のマッチ処理にのみ使用する「内部ルーティングラベル」を導入する。

ログの構造が変わる:

  • インタフェースとしては(Fluentdにログを投げたり、Fluentdから書き出されたログを扱うユーザーの視点では)従来通り tag, time, record の3要素からなるが、
  • Fluentd内部では(Fluentdの設定ファイルを書くオペレータの視点では)label, tag, time, record の4要素とする。

互換性の維持

デフォルトではlabelは空文字列とする。

設定ファイルの書式

こんな感じ:

<source>
  type forward
</source>

# 加工したら@filteredラベルを付けて再emitする
<match access.**>
  type exec_filter
  ...
  label @filtered
</match>

<match error.**>
  type exec_filter
  ...
  label @filtered
</match>

# 加工されたログはここにくる
@filtered:
  <match **>
    type mongo
  </match>

従来の設定ファイルとの互換性も保たれる。複雑なフィルタリング処理をしたい人だけが、labelを使用すれば良い。

out_route の導入

Fluentd内部で複雑な処理ができるように、label付けを行ったり、ログをコピーしたりするプラグインを導入する。設定ファイルはこんな感じ:

<source>
  type forward
</source>

# 入ってきたデータは、とりあえず全部routeプラグインに渡す
<match **>
  type route

  # 全ログを@archiveにコピー
  <match **>
    label @archive
    copy
  </match>

  # production.** を @alert にコピー
  <match production.**>
    label @alert
    copy
  </match>

  # **.events.** を @events にルーティング
  <match **.events.**>
    label @events
  </match>

  # **.error.** を @error にルーティング
  <match **.error.**>
    label @error
  </match>
</match>

@archive:
  ...

@alert:
  ...

@events:
  ...

@error:
  ...

実装案あれこれ

こっちに追記:Fluentd v11 内部ルーティングラベルの実装案あれこれ

2. Error#deterministic? の導入

背景

Fluentdは、ログの書き出しに失敗すると自動的にリトライをする。しかし、リトライしても絶対に成功しない種類のエラーだった場合(例えば、ログの構造が間違っていて処理できないケース)、タイムアウトするまでリトライをし続ける。

リトライをし続けている間、キューが詰まってしまうので、他の正常なログが破棄されてしまう可能性がある。

設計

リトライしても必ず失敗する例外は、Error#deterministic? で true を返す。この場合はリトライをしないで、すぐにログをエラーストリーム(後述)に流す。

リトライすると成功する可能性がある例外は、deterministic? で false を返す。この場合はリトライをして、タイムアウトしたらエラーストリーム(後述)に流す。

互換性の維持

IOError#deterministic? と Errno::XXX#deterministic? は、false を返すようにする。NoMethodError や EncodingError などは、true を返す。

ただし、IOError#deterministic? が、本当にリトライしたら成功する種類のエラーかどうかは分からない(例えば「ファイルがなかった」という例外が投げられたとき、リトライしたら成功するのかは、実装に依存する)。このため、ちゃんと処理するには、プラグイン開発者は Error#deterministic? (や例外クラス)をまじめに実装する必要がある。

課題

deterministic? のメソッド名が分かりにくい。

3. エラーストリームの導入

背景1:エラー処理の意味論

ログは、基本的に「投げっぱなし」にするもので、ログを投げたところエラーだったところで、どうしようも無いことが多い。ほとんど捨てるしかない。言い換えれば、Fluentdのようなログ処理システムでは、エラーを同期的に処理することはできない。

背景2:emitトランザクションの弊害

1回のEngine#emitの呼び出しで、複数のログをemitすることができる。この仕組みは、性能の最適化のために使われている(例えばin_forward)。

しかしこの仕組みは、「そのすべてのログの処理が成功するか、すべてのログの処理が失敗する」ように設計してある。このため、1回のemitの中に、異常な(だが無視しても良い)ログが1つでも混じっていた場合、他のすべての正常なログが破棄されてしまう。

設計

ログの処理中にエラーが発生しても、例外は投げない。その代わりに、別のlabelを付けてemitする(別のプラグインにemitする)。

設定ファイル

<source>
  type forward
  error_label @error
</source>

<match **>
  type my_validator
  label @validated
</match>

@error:
  # emitできなかったログはこっちに来る
  <match ...>
    ...

または、

<source>
  type forward
</source>

<match **>
  type forward
  ...
  error_label @error
</match>

@error:
  # writeできなかったログはこっちに来る
  <match ...>
    ...

課題

Fluentdコアの設計が複雑化する。しょうがない気もする。

4. ProcessManager の導入

背景

現状、プラグインでDetachMultiProcessMixinを使うと、そのプラグインを別プロセスで動作させて、並列性が向上させることができる(下図)。

Fluentd v10 のプロセス構成:

                                                    Detached Input plugin
                                                    +-----------------------+
                                                    |                       |
                                                    | Input plugin          |
                                                    |  |                    |
                                                    |  +--> Virtual Engine  |
                                                    |          |            |
                  +------------------------------+  +----------|------------+
                  |                              |             |
                  | Engine <----------------------<------------+
                  |  |  ^                        |
+------------+    |  |  |                        |   Detached Output plugin
| Supervisor | -- |  |  +- Normal Input plugin   |   +-----------------------+
+------------+    |  +---> Normal Output plugin  |   |                       |
                  |  +---> Virtual output plugin -----> Output plugin        |
                  |  +---> ...                   |   |                       |
                  +------------------------------+   +-----------------------+

しかし、この図から分かるように、Engineプロセスにはすべてのデータが流れる。RubyのCPU処理性能はマルチコアでスケールしないので、Engineプロセスがボトルネックになって性能が伸びない。

設計

Fluentd v11 のプロセス構成案:

_
                        +-------------------------------+
                        |                               |
                        | Engine                        |
                        |  | ^                          |
                        |  | |                          |
                        |  | +--- Input plugin          |
                        |  +----> Virtual output plugin |
                        |          |       |            |
                        +----------|-------|------------+
+----------------+                 | (1)   |
| ProcessManager <-----------------+       | (2)
+----------------+                         |
                     +---------------------+
                     |  +-------------------------------+
                     |  |                               |
                     +--> Engine                        |
                        |  |                            |
                        |  +----> output plugin         |
                        |                               |
                        +-------------------------------+

データの流れ

  1. Input pluginは、同一プロセス内のEngineにデータをemitする
  2. Engineは、マッチしたOutput pluginがどのプロセスで動いているかを判別する。 a. もし同一プロセスであれば、単にそのプラグインにemitする。 b. もし別プロセスであれば:
  3. ProcessManager に問い合わせて、宛先のプロセスに繋がっているファイルディスクリプタをもらう (1)
  4. 宛先のプロセスに直接データを送る (2)

プラグインのマルチプロセス動作

普通は1つのプラグインを1つのプロセスに割り当てて実行するが、とても負荷が高いプラグインについては、複数のプロセスに割り当て、並列動作させる。

課題

実装が大変!子プロセスが不意に死んだ場合の処理や、シグナルハンドラの処理など、考慮すべきことが多い。

5. SocketManager の導入

背景

Fluentdプロセスを再起動すると、プラグインがlistenしているTCPソケットが閉じられてしまう。このため、再起動後に再listenされるまでの間、新しいコネクションを受け付けられなくなってしまう。

ここで、プラグインのshutdownはgracefulに行われるので、再起動時に処理途中のデータが失われることは無いが、その反面時間がかかる。このため、再listenされるまでの時間は長くなりがちで、無視できない。

設計

ソケットは、必ずProcessManagerプロセスでlistenし、そのファイルディスクリプタを子プロセスに転送する。プラグインでは次のようなAPIを使う:

sock = Engine.listen(address, port)  #=> #<TCPServer>
sock = Engine.listen_unix(path)      #=> #<UNIXServer>

SocketManagerは、同じ addressport でlisten要求が来たら、以前にlistenしたファイルディスクリプタをそのまま返す。

課題

ファイルディスクリプタの転送には、UNIXドメインソケットを使うが、JRubyで使えるかどうか分からない。Windowsで動く必要は無さそうだが、JRubyでは動いた方が良さそう。

6. 到達保証の導入

背景

現在のFluentdは、ログが重複しないようにするat-most-onceのサポートしている。このトレードオフとして、サーバが故障した場合などにログが失われることがある。

ところが、ログが重複してもいいから、何が何でも確実に1回はログが転送されて欲しいケース、つまりat-least-onceをサポートして欲しいケースも存在する。

課題

実現方法を思いついていない。

at-least-onceをサポートするメリットが、at-least-onceをサポートすることによって複雑化するデメリットを上回るのか謎。

シンプルな実装方法が求められている。バッファプラグインだけで対応できないだろうか。

@methane
Copy link

methane commented May 9, 2012

Error#deterministic? については、メソッドを追加するよりも例外を分けてしまったほうが良い気がします。

プロセス構成については、 Engine.emit がボトルネックになるようなケースはパックされてないメッセージが大量に
発生して純粋にメソッドの呼び出し回数が増える以外に思いつかないので、本当にEngineをスケールする必要が
あるならInputからマルチプロセス化してプロセス間通信は最低限にする設計がいいと思います。

@frsyuki
Copy link
Author

frsyuki commented May 9, 2012

Error#deterministic?

なるほど。ただ「DeterministicError クラスを継承しているクラスと、そうでないクラス」という分け方をすると、組み込みのIOErrorを deterministic なエラーに変更することができなくなってしまうので、困ります。
「module DeterministicError を include しているクラスと、そうでないクラス」ならOK。こちらの方がきれいな実装ではありますね。

プロセス構成

スケールしないのはEngineそのものと言うよりは、バッファリング処理です。ログの書き出し処理はバッファリングされたチャンクに対して1回走るのに対し、バッファリング処理はメッセージ1つ1つに対して走るので、圧倒的に負荷が高い。特に排他制御のオーバーヘッドが大きいようです(input pluginはマルチスレッドで動作しているので、バッファリングする際に排他制御が必要)。シリアライズ(とbuf_fileの場合はI/O)も重い。

Inputからマルチプロセス化してプロセス間通信は最低限

各プロセスへのプラグインの割り当て方を工夫すると、プロセス間通信を一切発生させずにスケールさせることができるはずです。
一番簡単な方法は、全プロセスそれぞれで全プラグイン実行する方法です。この場合、emitは確実に同一プロセス内のoutput pluginに対して行われるので、プロセス間通信が発生しない。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment