Skip to content

Instantly share code, notes, and snippets.

@oza
Created October 28, 2012 23:36
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oza/3970461 to your computer and use it in GitHub Desktop.
Save oza/3970461 to your computer and use it in GitHub Desktop.
Impala BE Note

Impala BE メモ

本文章は,Impala のメモ の TODO のうち,BackEnd の特にクエリの実行部を読み,文章化したものである.

概要

BackEnd(BE) は単体のデーモンとして動作し,FE 側でパースした結果(Job ID/Plan)に基づいて処理を実行する.FE と BE のやりとりには Thrift が用いられている.

|FE| - パース結果(Job ID/実行Planなど) thrift -> |BE|

実行 Tree/Node について

実行プランは,Tree として表現される.Tree の構成要素を Node と呼ぶ.Node は Expr インタフェースを実装する必要がある. Tree は,FE で構築されて,Thrift 経由でシリアライズされ,BE 側の ExecNode::CreateTree() を実行することで再構築できる.

Node の種類には,実際に処理をおこなう ExecNode とデータの読み込み・スキャンを行う ScanNode がある.Node 一覧は以下の通り.

  1. HDFS_SCAN_NODE : HDFS からデータ読み込みを行う.RCFile/Text/SequentialFile などに対応. 

  2. HBASE_SCAN_NODE : HDFS からデータ読み込みを行う. 

  3. HASH_JOIN_NODE : Hash Join を行う.

  4. AGGREGATION_NODE : MIN/MAX/SUM/COUNT などの集約処理を行う.

  5. SORT_NODE : 入力された RowBatch に対して ソートをおこなう.未実装.

  6. EXCHANGE_NODE : Receiver node for data streams. This simply feeds row batches received from the ata stream into the execution tree(exchange-node.h より引用).

  7. MERGE_NODE : Node that merges the results of its children by materializing their evaluated expressions into row batches. The MergeNode pulls row batches sequentially from its children sequentially(merge-node.h より引用).

ScanNode

HDFS Scan Node

  1. Prepare : Row の集合全体を Table と見立てて初期化したり,LLVM の codegen を行ったりする.
  2. Open : HDFS へ接続を行う.
  3. GetNext : HDFS のブロックを読み込み,Row に変換する.RCFile なら hdfs-scanner-rcfile.cc に実装がある.
  4. Close : HDFS への接続を破棄する.
  5. SetScanRanges : ファイル読み込みのための初期化をおこなう.

ExecNode

Aggregation Node

  1. Prepare: 結果を格納する Tuple の初期化や,LLVM の codegen の初期化を行ったりする.
  2. Open: codegen された IR を JIT する.Row の集合である RowBatch を初期化し,処理できるようにする.
  3. Close : バッファをクリアする.
  4. UpdateAggTuple : 集約処理の本体.FEでパースした結果を利用して UpdateMinSlot/UpdateMaxSlot/UpdateSumSlot を呼び出し,入力値である RowBatch を対象に Min/Max/Sum などの処理を行う.

Node のインタフェース

Node は以下のメソッドを実装する必要がある.

  1. Prepare : 初期化メソッド.リクエストを受け付けた際に呼び出される.多くの Node では,ここで LLVM の Codegen を行う.
  2. Open : 初期化メソッド.HDFS_SCAN_NODE の場合は HDFS に接続しにいく.
  3. GetNext : ScanNode の場合は,次の Row を読み込んでバッファに格納する.
  4. Close : ScanNode の場合はストレージへの接続を解除する.
  5. CreateTree : 実行プランを Tree として構築する.
  6. EvalConjuncts: ??
  7. CodegenEvalConjuncts: ??

Node の処理単位について

各 Node は,処理を Tuple という単位で処理を行う.実際に ExecNode が処理を行う場合は,Tuple クラスを直接用いず,Tuple を抽象化した TupleRow クラスおよび TupleRow の集合である RowBatch クラスを用いて処理を行う.以下に各クラスの概要を示す.

Tuple

1レコード.何を1レコードと見なすかは各ScanNodeが決めている. HBASE_SCAN_NODE の場合は HBase の 1 Rowに相当.API は以下の通り.

  1. Create
  2. Init
  3. DeepCopy
  4. SetNull
  5. SetNotNull
  6. IsNull
  7. GetSlot

TupleRow

Tuple を抽象化したクラスで,これが処理する最小単位.内部で Tuple へのポインタを保持している.API は以下の通り.

  1. SetTuple
  2. DeepCopy

RowBatch

メモリ内にある Tuple の集合State としては in-flight(未処理のtupleがあるかないか)/non in-flight がある.API は以下の通り.

  1. AddRow: 行を追加し,in-flight を false にする.
  2. CommitRows(int n): n 行分だけ
  3. Swap : 現在処理している RowBatch と,引数で与えられた RowBatch に変える. 非同期実行中に,新たな入力が来た際に使うらしい.
  4. まだあるけど省略

TODOs

  1. 読み切れていない Node を全て読み,文章化する.

    1. HBASE_SCAN_NODE
    2. HASH_JOIN_NODE
    3. SORT_NODE
    4. EXCHANGE_NODE
    5. MERGE_NODE

    とりわけ,EXCHANGE Node はデータのやりとりをする際のバッファとして機能するようなので,全体の動きを把握するために理解が必須.

  2. Coordnator を文章化する.

  3. Tree の実行フローを記述する.

  4. LLVM 依存部を理解し,文章化する.特に EvalConjuncts 周り.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment