Skip to content

Instantly share code, notes, and snippets.

@frsyuki
Created July 22, 2012 21:44
Show Gist options
  • Save frsyuki/3161109 to your computer and use it in GitHub Desktop.
Save frsyuki/3161109 to your computer and use it in GitHub Desktop.
# httpで受け取ったら問答無用でtagを入れてmongoへ
<source>
type http
port 8891
<filter>
type include_tag
key tag
</filter>
<match>
type mongo
# ...
</match>
</source>
# 9991/tcp で受け取ったらhostnameを入れて...
<source>
type forward
port 9991
<filter **>
type set
key received_at
value_hostname
</filter>
</source>
# myapp.**はフィルタリングしてからs3へ
<match myapp.**>
<filter>
type include_time
key time
format ...
</filter>
<match myapp>
type s3
</match>
</match>
# それ以外はファイルにアーカイブしてから別サーバへ転送
<match>
<fiter>
type copy
<match **>
type file
# ...
</match>
</filter>
<match>
type forward
# ...
</match>
</match>
##
## https://gist.github.com/3088515 からの変更点
##
#
# 1. match のデフォルトの type は "redirect" = 受け取ったデータを内部メッセージバスに emit するプラグイン。これによって <match> をネストできる
#
# 2. <match> や <filter> でマッチパターンは省略できる。デフォルト値は "**"
#

ストリームとチャンク

  • stream := (tag, [(time,record), ...])

    • timerecord のペアのシーケンス
    • シーケンスに終端は存在しない
    • 属性値として tag を持つ
  • chunk := (attributes, [(time,record), ...])

    • timerecord のペアのシーケンス
    • シーケンスに終端は存在が存在する
    • 属性値として attributes を持つ
      • attributes は単なる hash
    • immutable

stream は概念上の存在。tag が異なれば stream が別だという点がポイント。tag が異なれば順序保証は存在しない。

chunk は実装上の存在。切り取られた stream。一度作ったら変更できない。 attributes という任意のデータを持てる点がポイント。UUID やログの開始時刻や終端時刻などを含めることができ、TimeSlicedOutput や冪等性保証などに使われる。

エージェント

  • agent
    • 全プラグインの基底クラス
    • 次のメソッドを持つ:
      • initialize
      • configure(conf)
      • start
      • shutdown (shutdown は start が成功すれば必ず1回以上呼ばれる)
      • close (close は initialize が成功すれば必ず1回以上呼ばれる)

コレクタ

  • collector
    • record または chunk を受け取って処理するモジュール
    • open(tag) メソッドを持ち、 writer を返す
      • writerappend(time, record)write(chunk) メソッドを持つ

バッファ

  • buffer
    • collector を実装したクラス
    • stream を切り取って chunk を作る
    • 要するにバッファプラグイン

ストリーム・ソース

  • stream_source
    • stream を作り出すモジュール
    • 内部 message_bus を1つ所有している(=作り出したストリームの出力先)
    • message_bus に所属している(=親 message_bus

メッセージ・バス

  • agent_group

    • 複数の agentagent_group を所有するクラス
    • collect_agents メソッドを持ち、 ネストしているものを含めてすべての agent を集めて返す
  • message_bus

    • agent_group を継承し、collector を実装したクラス
      • 所有する agent は、collectorstream_source の片方か両方を実装している
    • メッセージを受け取ったら、所有している collector にルーティングする
  • labeled_message_bus

    • message_bus の派生クラス
    • 内部ルーティングラベルを実装しており、ラベルごとに1つ message_bus のインスタンスを持つ

プロセス・マネージャ

  • process_manager

    • processor をいくつか生成する
    • 複数の agent を受け取り、processor に割り当てていく
      • TODO: ここの割り当てアルゴリズムはまだ考えていない
    • processor の起動と終了を管理する
  • processor

    • 子プロセスを作って、割り当てられた agent を実行する

Fluentd

  • labeled_message_busprocess_manager を1つずつ所有している
  • 設定ファイルを読み込んで、labeled_message_bus.configure(conf) を呼び出す
  • labeled_message_bus.collect_agents を呼び出して全 agent を収集し、process_manager に渡す
  • process_manager を実行する

基本プラグインAPI

  • input

    • agent の派生クラスで、stream_source を実装している
  • output

    • input の派生クラスで、collector を実装している
  • buffered_output

    • output の派生クラスで、内部 buffer を持ち、recordchunk に変換してから扱う
  • filter

    • agent の派生クラスで、collectorstream_source を実装している

拡張プラグインAPI

  • buffer_bag
    • 複数のバッファ(構造化されていない)をファイルまたはメモリ上で管理するクラス
    • フォーマット済みのバイナリデータをバッファリングしながら作成していきたい場合に使う
    • out_file などが使う
    • データ構造はキューではなくMap。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment