Skip to content

Instantly share code, notes, and snippets.

@descico
Last active April 6, 2017 13:44
Show Gist options
  • Save descico/25c56fa91768432fe1e7 to your computer and use it in GitHub Desktop.
Save descico/25c56fa91768432fe1e7 to your computer and use it in GitHub Desktop.
Apache Pig 関係

コーディング規約、長年開発・運用・保守をしてきた経験から考えていること

  • alias はデータの流れごとにシンプルに命名する。極端な話 A01 とか B03 とかそういうのにする。
    • ただし、データの流れがわかるように、ある元データの変換であればプレフィックスは固定であとの数値を変える。
    • 例えば LOAD -> FILTER -> GROUP BY -> FOREACH GENERATE であれば、 A01 -> A02 -> A03 -> A04 というように。
    • SPLIT, JOIN したらプレフィックスは変えて、 01 から番号をふっていくべきだろう。
    • その alias が何を指すのか名前でわかるようにするべきでは、という声もあろうが、 Pig の場合はそうでもないし、むしろ命名が長ったらしくなりがち。
  • LOAD するパスは変数にしない。 grep で LOAD しているファイルを探せなくなる。

Pigでファイルの差分を取り出す

  • 特に難しいことはないが、 TextLoader を使えばいい、というだけの話。
A1 = LOAD '/path/to/file1' USING TextLoader() AS (
  line :chararray
);
 
B1 = LOAD '/path/to/file2' USING TextLoader() AS (
  line :chararray
);
 
C1 = JOIN
  B1 BY line LEFT OUTER,
  A1 BY line;
 
C2 = FILTER C1 BY
  A1::line IS NULL;
 
DUMP C2;

DynamicInvokerのサンプル

DEFINE UrlDecode InvokeForString('java.net.URLDecoder.decode', 'String String');
DEFINE UrlEncode InvokeForString('java.net.URLEncoder.encode', 'String String');

A1 = LOAD '/path/to/file' AS (
  url :chararray
);

A2 = FOREACH A1 GENERATE
  UrlEncode(url, 'UTF-8') AS encoded_url;

DUMP A2;

A3 = FOREACH A2 GENERATE
  UrlDecode(encoded_url, 'UTF-8') AS decoded_url;

DUMP A3;
# パラメータ
## pig.maxCombinedSplitSize
* mapper はふつう、ある 1 ブロックを入力とするが、 Pig では、このオプションで指定したサイズ以下or未満におさまるように、入力をまとめることができるようだ
* 単位はバイト。
* 例を上げると、例えば、このパラメータが 64000000 、つまり約 64MB で、ブロックサイズが 128MB 、入力のファイルが 1MB のファイル * 32 個、というような場合、 32 ファイル(かつ、ブロック)まとめて 1 つの入力にして、処理することができる。
* 上の例示ではブロックサイズはあまり関係ないが、もちろん、ブロックサイズが 0.5MB であれば、 32 ファイル 64 ブロックということになる。
* このパラメータで注意しなければならないことは、圧縮したファイルの扱いである。私が運用している環境では、 HDFS では splittable でない方式の圧縮では、 1 mapper が処理するデータが大きくならないよう、ブロック数でファイルを分割して圧縮しているが(これは単に mapper だけをとおす MapReduce で出力を圧縮すれば良いだけ)、ふつう 1 mapper は、非圧縮の 1 ブロックを扱うことになるが、圧縮しており、かつこのパラメータが聞いてしまうと、例えば元サイズの 10% 以下に圧縮できた場合、前述の例のパラメータは 64MB ・ブロックサイズ 128MB という場合は、通常の 5 倍のデータを 1 mapper が処理することになる(非圧縮 1 ブロックのサイズ = 128MB = 圧縮後 10 ファイル, 64MB を閾値とするなら 5 ファイルがまとめられる)。このような場合は、このパラメータを極めて小さくすれば、入力の統合は起こらなくなり、非圧縮のときと同じ map 数が起動することになる(この例では元ファイルのブロック数で分割して圧縮しているため)。
# 0.7.0 での出力が、 0.9.2, 0.11.0 で出力した際に大きく欠損する際にはマルチクエリを無効にすると良い
* 0.7.0 で出力したときと、 0.9.2, 0.11.0 で出力したときとで、後者では前者の結果の数割が欠損する、という事案がある。
* いろいろ試したところ、これはマルチクエリを無効にすると解消されるらしいことがわかった。
* マルチクエリを無効にする方法は、以下のオプションのいずれかを実行時に付け足すことで可能になる。 "-D" のものは最初につける必要があることに注意せよ。
* -Dopt.multiquery=false
* -M
* -no_multiquery
# Hadoop1.x で動いてたものを Hadoop2.x に持ってきたときに Pig のバージョンが違うと挙動が変わる話。
以下のパターンがあった。解決方法がわかった場合はそれも示しておく。
* 0.12.0 では OK だが、 0.9.2, 0.11.0 では NG
* 0.11.0 では NG だが、 0.9.2 だと OK で、 0.12.0 だと syntax error
* これは、 0.12.0 だと恐らく、予約語をエイリアスとして使うことが一切できない、という話だった模様。
* 例えば register という名前のエイリアスは使えない。 register は UDF のライブラリを使う際のディレクティブである。
* 0.11.0 で、 JOIN - USING 'replicated' を指定している処理が Hadoop2.x ではこけるケースがある(常にこけるかどうかは知らない)
* "Error: org.apache.pig.backend.executionengine.ExecException: ERROR 2081: Unable to setup the load function" というのではじまるスタックトレースで、途中で "org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162) Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://main/user/(username)/pigrepl_scope-****" というようなメッセージが書かれていた。
# 0.12.0 では MAPREDUCE の STORE でつくる中間ファイルを自動的に削除しているっぽい
* タイトルのとおりのことが起きていた。
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment