Skip to content

Instantly share code, notes, and snippets.

@wnoguchi
Forked from tsuda7/gist:a5c058aab86c909ae005
Last active September 18, 2015 11:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wnoguchi/08cda16bbeb1b40d5ab2 to your computer and use it in GitHub Desktop.
Save wnoguchi/08cda16bbeb1b40d5ab2 to your computer and use it in GitHub Desktop.
Hadoop 徹底入門(第2版) 輪読会メモ
  • お膳立て
    • 環境は Vagrant から起動する CentOS
    • JDK7
    • Hadoop ディストリビューションは CDH

Chapter 1 "Hadoop の基礎知識"

Hadoop とは

一言で言うと…

「大量のデータを処理するための並列分散処理ソフトウェア」

分散ファイルシステム(HDFS)と、並列分散処理を実現するフレームワーク(MapReduce)の2つから成る

既存のバッチ処理の置き換えのみならず、「今までできなかったことや諦めていたことが出来るようになる」

誕生の経緯

Google 社の論文 “The Google File Ssytem”(2003), “MapReduce: Simplified Data Processing on Large Clusters”(2004) をもとに OSS として実装を始めたのが Hadoop プロジェクトの始まり

Hadoop の適用領域

  • 数百GB 〜 数TBなどの大量データのバッチ処理に適している
  • 拡張性に優れる(スケールアウトでの処理性能向上)
  • 柔軟なデータ構造に対応

サーバ構成

  • HDFS: NameNode(マスタ:1台)、DataNode(スレーブ:複数台)
    • 図1.10, 図1.11(P.22)
  • MapReduce: JobTracker(マスタ:1台)、TaskTracker(スレーブ:複数台)
    • 図1.13, 図1.14(P.24, P.26)、"MapReduce 処理の流れ"
  • マスタサーバは高性能・高信頼のマシンを利用する
  • スレーブサーバは RAID なしの SATA ディスクを利用することが多い

Chapter 2 "MapReduce アプリケーションの活用例"

活用例

ブログのアクセス集計(P.34)

  • 日で分割されたアクセスログに対し、ブログごとのユニークユーザー数が知りたい。不正アクセスは取り除く
  • 単純な MapReduce ジョブに分割するパターン
    • フィルタリングと、ユニークユーザーの洗い出し
    • ブログごとにユニークユーザー数を集計

似ている人を見つける(P.38)

  • SNS で同じ特徴を持つユーザーをグルーピングする

検索エンジンインデックスの作成(P.40)

画像データの分散処理(P.42)

  • Reduce が必要ないパターン

MapReduce で実現できる処理の特徴と活用のポイント

  • map 関数と reduce 関数を適切に使い分ける
    • map は順序制御できないため、データのクレンジング処理や、データのフィルタリングなど、前処理に向く
    • reduce はキーに対してひもづいた複数のデータが渡される。データもキーでソートされている。集約処理や、時系列データの処理に向く
  • 集約単位や分割の軸を考慮し、中間データのキーを選択する
    • Shuffle フェーズでは同じキーを持つ中間データ同士がまとめられるため、どのような軸で集約・分類を行うのかを考慮する
  • 複雑な処理は単純な MapReduce ジョブに分割する
  • map 関数だけで処理が完結するかどうか検討する

Chapter 3 "Hadoop の導入”

動作環境

  • Hadoop のディストリビューション

    • Apache Hadoop
    • Cloudera's Distribution including Apache Hadoop(CDH)
      • RPM, deb などのパッケージを用いて簡単に導入できる
      • 提供されているコンポーネントの組み合わせで動作が確認されている
    • Hortonworks Data Platform(HDP)
  • MapReduce は 1.0 系と 2.0 系が存在する

    • ジョブ管理方法が大きく異なる
      • 1.0 系では JobTracker, TaskTracker からなるフレームワーク
      • 2.0 系では、YARN(Yet Another Resource Negotiator, 第22章にて解説)が採用されている
  • Hadoop の動作モード

    • ローカルモード
      • 1台、HDFS 未使用
      • MapReduce の動作検証に利用する
    • 疑似分散モード
      • 1台、HDFS 使用
      • HDFS, MapReduce の動作検証、Hadoop アプリケーションの機能検証に利用する
    • 完全分散モード
      • 複数台、HDFS 使用
      • 商用環境の構築、ノード間通信を含む HDFS, MapReduce の動作検証、パフォーマンスなどの非機能要件の検証

インストールから動作確認

  • JDK は 1.7 も対応(書籍では1.6)
  • HDFS の設定でつまづくことが多いため、先に4章を読んでおくとよい

Tips

IP アドレスからホスト名が逆引きできないとエラーとなる

[/var/log/hadoop-hdfs/hadoop-hdfs-datanode-vagrant-centos65.vagrantup.com.log]
2014-02-16 14:54:04,691 FATAL org.apache.hadoop.hdfs.server.datanode.DataNode: Exception in secureMain
java.net.UnknownHostException: vagrant-centos65.vagrantup.com: vagrant-centos65.vagrantup.com
  • 下記設定を加える(192.168.33.10 は Vagrant の初期ローカルIP):
[/etc/hosts]
(中略)
192.168.33.10 vagrant-centos65.vagrantup.com

hdfs コマンドについて

  • 書籍中では sudo コマンドが多用されており、本来のコマンドが分かりづらい。。
[誤]
[someuser@vagrant]$ sudo -u hdfs dfs fs -mkdir ...
[正]
[someuser@vagrant]$ sudo -u hdfs hdfs dfs -mkdir ...
  • hdfs ユーザでログインしていれば、以下の通り:
[hdfs@vagrant] $ hdfs dfs -mkdir ...

クラスタ不整合

  • NameNode の初期化を行う場面があるが、DataNode 起動後に行うと DataNode でエラーが発生する(クラスタ不整合)
[/var/log/hadoop-hdfs/hadoop-hdfs-datanode-vagrant-centos65.vagrantup.com.log]
2014-02-18 16:09:55,500 FATAL org.apache.hadoop.hdfs.server.datanode.DataNode: Initialization failed for block pool Block pool BP-
862701839-192.168.33.10-1392739729485 (storage id DS-674077449-192.168.33.10-50010-1392578724409) service to localhost/127.0.0.1:8
020
java.io.IOException: Incompatible clusterIDs in /var/lib/hadoop-hdfs/cache/hdfs/dfs/data: namenode clusterID = CID-a578ba1f-9779-4
eff-9b51-a0cd3093acb2; datanode clusterID = CID-26399fe0-9d4c-4690-834e-427d7cd8d36d
  • NameNode のメタファイルが初期化されるため、NameNode が管理していないデータとなってしまう
    • DataNode の対象ディレクトリを削除し、DataNode を起動し直せば良い
    • 対象ディレクトリは /etc/hadoop/conf/hdfs-site.xml で確認できる

hdfs コマンドで mkdir をしても、実際にそこにフォルダは作られない

  • 実際に HDFS でファイルを作成しているため、各 DataNode にブロックが分散されているイメージとなる
  • hdfs dfs -lshdfs dfs -stat コマンドで確認できる

Chapter 4 "HDFS”

HDFS の特徴

  • 巨大なデータを汎用サーバーのみで扱える
  • 容量にスケーラビリティがある
  • シーケンシャルアクセスで高いスループットを出せる
    • “Write Once, Read Many” 方式
      • データの一部のみ更新することはできない
      • ランダムアクセスは想定していない
    • ブロックサイズを大きくすることで(デフォルトで64MB)、一度にまとめて大量データを扱える
    • 上記性質から、特に大量のログファイルなどの格納に向いている
  • スレーブノードの一部が故障しても、データの損失を回避できる
    • レプリケーション(レプリカ)を設定できるため、一つのブロックを複数台で保持することで損失回避

HDFS の仕組み

  • マスターサーバ(NameNode)、スレーブノード(DataNode)のマスタースレーブ構成
  • DataNode は、ファイルのデータブロックを管理する
  • NameNode は HDFS の全体的な管理を行う。また、NameNode 上で扱うファイルシステムのメタデータのメンテナンスを担うのが SecondaryNameNode である

DataNode について(4.2.2節)

  • HDFS は Java で実装されたミドルウェアであり、OS よりも上位のレイヤーで提供されるファイルシステム
  • HDFS 上に格納したファイルの実体は、各 DataNode 上のローカルファイルシステム(ext4など)上のファイルとして存在
  • HDFS 上のデータは「ブロック」と呼ばれるチャンクに分割される
    • デフォルトは 64MB
    • レプリケーション数が3の場合は、3つの DataNode 上に同じ「ブロック」がコピーされる
    • ローカルファイルシステムと違い、1MB しか利用していないときは 1MB 分のディスク領域しか利用しない
      • ただし、あまり細切れにするとブロック数が増えて処理のオーバーヘッドが増加する
    • 全てのレプリカが消失しないよう、同一サーバー・同一ラック上に存在しないように設定することが可能(ラックアウェアネス)

NameNode について(4.2.3節)

メタデータの管理

  • NameNode はデータではなく、メタデータを管理する。「どのブロックがどのファイルのどの部分であるか」
  • メタデータはメモリ上で管理される
  • メタデータには以下の情報が含まれる:ファイル名、親ディレクトリ、サイズ、所有者・所有グループ、属性、ブロックIDとDataNodeのペア(4.4節)
  • ブロックID と DataNode のペアは、DataNode からの申告をもとに動的に構築される
    • DataNode はハートビートパケットの送信タイミングで、自身が保持しているブロックを NameNode に伝える
  • その他の情報は、メモリからディスクへ同期が行われる(逆に言うと、ブロック情報は永続的には保持されない)
  • 下記情報を用いて、クラスタ起動時に初期化が行われる(Write-Ahead Log と同じ仕組み)
    • メタデータから60分に1回同期される fsimage と呼ばれるファイルシステムイメージ
    • 編集履歴を記録した edits と呼ばれる編集ログ(メモリとディスク、どちらにも書き込まれる)
  • fsimage をより最新の状態に保つ SecondaryNameNode を利用することも可能
    • NameNode から定期的に fsimage と edits を受け取り、最新の fsimage を作成して NameNode へ戻す
    • NameNode 以外に fsimage を保持することで、メタデータの完全消失も防ぐ

HDFS の使用状況の確認

  • HDFS 全体の使用状況
  • 各 DataNode における HDFS 用領域の使用状況
    • 逼迫している場合には、レプリカを別 DataNode に移す
  • 各ブロックのレプリカ数の管理
    • 足りていなかったり余分であれば、調整(作成 or 破棄)を行う

http://192.168.33.10:50070/dfshealth.jsp

上記 URL で、NameNode から HDFS の使用状況が確認できる

クライアントからのHDFSの操作の受付

  • クライアントは NameNode に読みたいファイルを問い合わせる
  • NameNode は、どのブロックがどの DataNode に存在するかを応答
  • クライアントは、DataNode に直接アクセスしてデータ(ブロック)を読みにいく

DataNode の死活監視

  • DataNode からハートビートのパケットが一定時間受信されないと、その DataNode を故障と見なす
    • ハートビートパケットは、デフォルトでは3秒ごとに送信される

HDFS のファイル読み書きの流れ

  • 書き込み:図4.4(P.69)
    • フロー
      • クライアント -> NameNode: ファイルオープン要求
      • NameNode -> クライアント: ストリーム返却(?)
      • クライアント : ブロックサイズに分けたファイル(パケット)を、データ送信キューに追加
      • クライアント : NameNode に、DataNode のブロック割当を要求
      • NameNode -> クライアント: DataNode パイプラインを返却
      • クライアント : ack 待ちキューに全パケットを保存
      • クライアント -> DataNode: DataNode パイプラインの先頭 DataNode にデータを書き込む
      • 先頭DataNode -> 後続DataNode: パケット伝搬。全 DataNode に書き込めたら、ack 待ちキューから該当パケットを削除
    • 書き込み障害時
      • ack キューに残っているパケットを、データ送信キューに戻し、障害が発生した DataNode を切り離してパイプラインを形成する
  • 読み込み:図4.5(P.71)
    • 書き込み手順と同様、NameNode に問い合わせ、帰ってきた DataNode にクライアントが問い合わせに行く
    • 読み込みの手間を省くため、クライアントはそれぞれの DataNode が持つ対象データのブロックを、DataNode ごとに一度に読み込む

Chapter 5 "MapReduce フレームワーク"

個々のフェーズの動き

Map 処理(図5.2)

  • 入力単位は「スプリット」
    • 通常は、HDFS のファイルのブロックが割り当たる
    • 1レコードずつ読み込み、Map 処理を行う
    • (?) キーバリュー解釈
  • Map 処理後の中間ファイルは、ローカルディスクを利用する

Shuffle & Sort 処理(図5.3)

  • 中間ファイルはキーごとにソートされており、Partitioner が対象の Reducer を決める
    • Partitioner はデフォルトではキーのハッシュ値の剰余で定まる
    • Mapper と Reducer が同じ場合はノード間のデータ通信が発生しない
    • ノード数が多いと、必然的にノード間のデータ通信量が増え、処理全体の性能問題になりやすい
  • Reducer に集まった中間ファイル群はマージソートされ、それぞれの Key ごとに集められる(パーティション)

Reduce 処理(図5.4)

  • 入力単位は「パーティション」
    • 中間ファイル(ローカルディスク上)が割り当たる
  • 出力先は HDFS
  • Map 処理と違い、分散しにくい

アーキテクチャ

JobTracker の役割

MapReduce フレームワークが提供する分散処理を制御するためのマスターとして動作する Java プロセス。

単一障害点のなり得るが、HA クラスタ構成を取ることも可能(15章「可用性の向上」)。

  • ジョブの管理
    • Map タスクの割り当て制御
    • Map 処理結果把握
    • ジョブ進捗通知
  • リソース管理
    • 処理の割り当て
    • 処理の投機的実行
    • 処理再割り当て
    • ブラックリスト化
    • TaskTracker 死活監視
    • TaskTracker 追加/切り離し
  • ジョブ実行履歴の管理

TaskTracker の役割

Map 処理、Reduce 処理を実行するスレーブノード。

実行においては、「Child プロセス」と呼ばれる Java プロセスを生成する。Child プロセスが処理を終えると、TaskTracker は JobTracker に処理の完了通知を行う。

  • Child プロセスの生成と処理実行
  • Child プロセスの状況確認
  • 処理停止の通知
  • ハートビート通信
  • Map 処理数と Reduce 処理数の把握

JobClient の役割

ユーザーが定義した MapReduce 処理を JobTracker に依頼するためのクライアント。

  • 入力データの分割方針の決定
  • ジョブの依頼
  • アプリケーションの配布
    • データではなく、アプリケーション自体を配布し、通信量を削減する(データローカリティ)
  • 進捗状況の受信
  • ジョブの管理

動作確認

JobTracker 管理画面

http://localhost:50030/jobtracker.jsp

  • 上記画面で、JobTracker のステータスが確認できる
  • 404 Not Found となる場合は、JobTracker を再インストールするのがよい…と Web 上では散見される(未試行)

Tips

サンプルの実行ユーザーについて

5.5.3 節の動作確認は適切なユーザーでコマンドを実行する必要がある。特に root ユーザーは HDFS 上での特権ユーザーとならないため、注意が必要である(HDFS 上の特権ユーザーは、NameNode 実行ユーザー)

[誤]
[root@vagrant]#       hadoop jar hadoop-examples.jar ...
[正1]
[sampleuser@vagrant]$ hadoop jar hadoop-examples.jar ...
[正2]
[hoge@vagrant]$       sudo -u sampleuser hadoop jar hadoop-examples.jar ...

また、実行権限などが付与されていない場合は、hdfs dfs -chown コマンドなどで随時調整する。

Chapter 6 "Hadoop アプリケーションを動かす"

それぞれの文法などはともかく、特徴や動かし方の概要を学ぶ章です。

MapReduce アプリケーション(Java)(6.2章)

特徴

HadoopStreaming(6.3章)

詳細は11章。

特徴

  • 標準入出力経由で処理データが受け渡される ** 標準入出力が扱える言語であれば何でも良い
  • キーバリュー間はタブ、キーバリューのペア間は改行区切り
  • Mapper の出力はそのまま Reducer に渡されるため、Shuffle & Sort の処理は行われない(リスト6.2)
results = {}

def  wc_reduce(line):
    key, value = re.split(r'/t', line.strip())    # /t? \t ではなくて?
    if not key in results:
        results[key] = 0
    results[key] = results[key] + int(value)

Pig(6.4章)

Pig Latin と呼ばれる DSL を提供するフロントエンド。詳細は12章。

特徴

  • データ処理に特化した言語
    • Pig Latin で記述した処理が MapReduce ジョブに変換されるため、MapReduce の詳細や MapReduce ジョブがどのように制御されているかが隠蔽される
  • 少ないコードで記述出来る
    • Java で MapReduce を書くより記述量が減る
  • 多段処理に有利
    • あるデータ処理の出力を、別のデータ処理の入力として扱うといった多段処理が楽に行える

サンプル

下記内容をファイルにし、pig コマンドの引数に与えるか(バッチモード)、pig コマンドを実行した後のシェルにて一行ずつ実行することができる(インタラクティブモード)

records = LOAD 'input' AS (line:chararray);
words = FOREACH records GENERATE flatten(TOKENIZE(line)) as word;
word_group = GROUP words by word;
word_count = FOREACH word_group GENERATE group AS word, COUNT(words) as count;
word_count = ORDER word_count by word;
STORE word_count INTO 'sample_pig_batch/output';
  • データ入出力
    • LOAD, STORE
  • データ加工
    • FOREACH <入力リレーション> GENERATE <処理対象のfield>
      • 入力リレーションの特定の field を指定したり、UDF(=User Defined Function)や演算子を適用できる
  • データ集約
    • GROUP <入力リレーション> BY
    • ★どんな結果になるのか調べる
  • ソート
    • ORDER <入力リレーション> BY
  • flattern

Hive(6.5章)

HiveQL と呼ばれる DSL を提供するフロントエンド。詳細は13章。

特徴

  • データ処理に特化した言語
    • HiveQL で記述した処理が MapReduce ジョブに変換されるため、MapReduce の詳細や MapReduce ジョブがどのように制御されているかが隠蔽される
  • SQL とある程度の互換性をもつ
    • 学習コストが低い

Chapter 7 "Hadoop クラスタ環境の構築"

クラスタ環境の構築

  • 名前解決ができるよう、/etc/hosts に IPアドレス、FQDN を設定
    • 当然だが、DNS で名前解決ができれば不要
  • iptables 等、FW で接続設定

HDFS

  • core-site.xml
    • fs.defaultFS プロパティに NameNode のサーバ名を指定
  • hdfs-site.xml
    • データを保存するディレクトリの指定
      • NameNode, DataNode それぞれで見るプロパティが違うため、同じファイルにしてよい
    • レプリケーション数の設定
  • hosts.include, hosts.exclude
    • NameNode への接続を許可するサーバの FQDN を hosts.include に指定
    • 各ファイルへのパスは hdfs-site.xml で指定

MapReduce

  • mapred-site.xml
    • mapred.job.tracker プロパティに JobTracker のサーバ名とポート番号を指定
    • mapred.system.dir プロパティに MapReduce 用のシステムディレクトリを指定
    • map, reduce のタスク数もこのファイルで指定する
      • 例)8コアのマシンであれば、DataNode, TaskTracker に1コアずつ、残りを Map, Reduce タスクに割り振る
  • hosts.include, hosts.exclude
    • JobTracker への接続を許可するサーバの FQDN を hosts.include に指定
    • 各ファイルへのパスは mapred-site.xml で指定

結局何で作るか

Docker?

  • 履歴管理できるので、慣れれば楽に環境構築できそう
  • 今回は時間がなかったので断念

Vagrant?

  • ローカルマシンに複数仮想マシンだとメモリ足らない(Docker も一緒ですが)
  • 設定ファイルは共通でよいので、1つ image を作ってしまえば、使い回せる。共通の image を作成し、複数インスタンス立ち上げられる
  • image の作成
$ vagrant package default -out centos_with_cdh42.box
$ vagrant box add centos65_with_cdh42 ut
  • Vagrantfile
VAGRANTFILE_API_VERSION = "2" 
Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  config.vm.define :default do |default|
    default.vm.box = "centos65_with_cdh42"
    default.vm.network :private_network, ip:"192.168.33.10"
    default.vm.network :public_network
  end 
  config.vm.define :web do |web|
    web.vm.box = "centos65_with_cdh42"
    web.vm.network :private_network, ip:"192.168.33.11"
    web.vm.network :public_network
  end 
  config.vm.define :db do |db|
    db.vm.box = "centos65_with_cdh42"
    db.vm.network :private_network, ip:"192.168.33.12"
    db.vm.network :public_network
  end 

  config.vm.provider :virtualbox do |vb|
    vb.gui = true
    vb.customize ["modifyvm", :id, "--memory", "512"]
  end 
end

AWS?

Chapter 8 "MapReduce プログラミングの基礎 -Javaによる開発(1)-"

ソースコードからのプログラム実行(8.2節)

  • WordCount.java(上記サンプルコードよりダウンロード可能)
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool {
  public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(getConf(), "word count");
    job.setNumReduceTasks(2);
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    TextInputFormat.addInputPath(job, new Path(args[0]));
    TextOutputFormat.setOutputPath(job, new Path(args[1]));
    job.getConfiguration().setBoolean("mapred.used.genericoptionsparser", true);
    return(job.waitForCompletion(true) ? 0 : 1);
  }

  public static void main(String[] args) throws Exception{
    int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    System.exit(res);
  }
}

各種 API

Mapper

http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapreduce/Mapper.html

Reducer

http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapreduce/Reducer.html

Writable, Comparable, WritableComparable

入出力データの型が継承すべき抽象クラスたち。

  • データ入出力においてシリアライズが必須なため、Hadoop 独自の軽量 Writable クラスが提供されている
  • Shuffle & Sort の処理において比較可能である必要があるため、Comparable クラスが提供されている
  • 各データ型のクラスは、WritableComparable クラスを implements する

InputFormat, OutputFormat

  • TextInputFormat, TextOutputFormat
  • DBInputFormat, DBOutputFormat もある(詳しくは第9章、第10章)

Chapter 9 "MapReduce プログラミングの基礎 -Javaによる開発(2)-"

入力データの取り扱いの制御(9.2節)

  • 以下、クラスの例において例外処理は割愛する
  • 独自クラスや、デフォルトで設定されている処理クラスを変更する場合は、Job オブジェクトの set** メソッドでクラス名を指定すればよい

InputFormat(9.2.1節)

API doc: http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/InputFormat.html

public abstract class InputFormat<K,V> {
    public abstract List<InputSplit> getSplits(JobContext context);
    public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context);
}
  • getSplits は、入力データ全体を Map タスクごとの入力になる "split" という単位に分割する
    • つまり、split の数が Map タスクの数になる
    • FileInputFormat と、これを継承した TextInputFormat の場合は、HDFS のブロック1つ1つが InputSplit と対応する
      • 例外:
        • mapred.max.split.size に、ブロックサイズよりも小さい値が設定されている場合
        • 入力ファイルが圧縮されている場合は、ファイル1つに対して InputSplit が1つとなる(分割できないため)
  • InputSplit の情報は HDFS 上の一時ファイルに書き込まれ、各 Map タスクで実行される
    • つまり Writable クラスを implements している必要がある

InputSplit(9.2.3節)

API doc: http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/InputSplit.html

public abstract class InputSplit extends Writable {
    public abstract long getLength();
    public abstract String[] getLocations();
}
  • InputSplit の情報は HDFS 上の一時ファイルに書き込まれ、各 Map タスクで実行される
    • つまり Writable クラスを implements している必要がある
  • getLength() は、InputSplit のデータサイズをバイト数で返す
  • getLocations() は、対応するデータブロックをもっているスレーブノードのリストを返す
    • JobTracker は、getLocations() の返り値から、データのローカリティを考慮しつつタスクをスレーブノードに割り当てる
  • 実用上は上記2つのメソッドだけでは不十分。InputSplit の子クラスである FileSplit は、getPath, getStart というメソッドを新たに追加している

RecordReader(9.2.2節)

API doc: http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/mapred/RecordReader.html

public interface RecordReader<K,V>:
 void	close() 
          Close this InputSplit to future operations.
 K	createKey() 
          Create an object of the appropriate type to be used as a key.
 V	createValue() 
          Create an object of the appropriate type to be used as a value.
 long	getPos() 
          Returns the current position in the input.
 float	getProgress() 
          How much of the input has the RecordReader consumed i.e.
 boolean	next(K key, V value) 
          Reads the next key/value pair from the input for processing.
  • Mapper の map メソッドに渡るレコードを表すクラス
  • InputFormat クラスの createRecordReader メソッドにより生成される
  • RecordReader は実際の Map 処理で必要となるため、スレーブノード上で実行される(InputSplit はジョブを投入する前に必要なため、クライアント側で実行される)

出力データの取り扱いの制御(9.3節)

  • 入力データの場合と同じように、OutputFormat, RecordWriter インタフェースが存在する(split の概念はないため、InputSplit に対応するクラスはない)。

独自のデータ型を定義する(9.4節)

(Writable, WritableComparable の説明なので割愛)

Shuffle フェーズでの動作を制御する(9.5節)

Partitioner(9.5.1節)

public abstract class Partitioner<KEY, VALUE> {
    public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
  • デフォルトでは HashPartition が利用され、key のハッシュ値の剰余を取っている
  • 独自の Partitioner クラスを定義し、Configuration インスタンスに設定することも可能

Comparator(9.5.2節)

  • 同じ Reducer に処理をさせるために、独自の比較クラスを定義することが可能

Combiner(9.5.4節)

  • Reduce タスクに渡す前に、Reduce タスクと同等の処理を Map タスクのノードで実行できるのが Combiner
    • Map タスクの結果を Partitioner が分類し、Combiner が実行される
  • Reduce タスクを処理する Reduce クラスと同じクラスを割り当てることも可能(カウント系の処理など、ローカル上で集約可能な場合)

MapReduce アプリケーションのテストとデバッグ(9.6節)

開発のコツ

  • 一部の不正なデータでジョブ全体が失敗することを避けるため、不正なデータが来てもジョブを継続させる
    • MapReduce ジョブ全体で例外を投げる場面は限定的
  • Hadoop MapReduce アプリケーション開発では、少ない台数と小さなデータで検証を始める
    • 段階的にクラスタ環境とデータ規模を大きくし、手戻りを小さくする
  • 基本的に Java での開発であれば何でも出来てしまうが、MapReduce の作法に従うことでスケーラビリティと耐障害性を確保する
    • タスク間で通信せず、共通のリソースに出来る限りアクセスしない

テスト

  • 単体テストは MRUnit を利用すると良い
    • map, reduce は Context クラスの write メソッドで出力されるため、返り値で捕捉できない
  • MapReduce アプリケーションのデバッグ難易度は高いため、通常のアプリケーションよりも高品質なコードが求められる
public class WordCountTest extends TestCase {
    private WordCount.TokenizerMapper mapper;
    private MapDriver driver;

    @Before
    public void setUp() {
        mapper = new WordCount.TokenizerMapper();
        driver = new MapDriver(mapper);
    }

    @Test
    public void testWordCountMapper() {
        driver.withInput(new LongWritable(0), new Text("this is a pen"))
              .withOutput(new Text("this"),   new IntWritable(1))
              .withOutput(new Text("is"),     new IntWritable(1))
              .withOutput(new Text("a"),      new IntWritable(1))
              .withOutput(new Text("pen"),    new IntWritable(1))
              .runTest();
    }
}
  • MapDriver オブジェクトを利用し、入力と出力のテストを行う例

参考

  • 入力をデータベースから行う場合の方法や注意点について

    • http://blog.cloudera.com/blog/2009/03/database-access-with-hadoop/
    • DBInputFormat クラスによる実装
    • バルクロードして HDFS に突っ込んだ方が良いとの結論…
      • Mapper の数だけ、同じようなクエリが同時に発行されるため、DBサーバがボトルネックになりやすい
      • OLTP だとデータが常に変化するため、MapReduce の処理が書きづらい
  • SQL データベースと Hadoop を連携させるツールに Sqoop がある

Sqoop (“SQL-to-Hadoop”) is a straightforward command-line tool with the following capabilities:

Imports individual tables or entire databases to files in HDFS
Generates Java classes to allow you to interact with your imported data
Provides the ability to import from SQL databases straight into your Hive data warehouse

http://blog.cloudera.com/blog/2009/06/introducing-sqoop/

Chapter 10 "MapReduce プログラミングの基礎 -Javaによる開発(3)-"

圧縮データを扱う(10.2節)

  • コーデック実装として gzip, bzip2 も利用可能だが、MapReduce アプリケーションにおいては速度重視の SnappyCodec がよい
  • 入力ファイルが圧縮されている場合は、ファイル1つにつき InputSplit が1つ作成される
    • データブロックごとに独立して伸張できる保証がないため

MapReduce での処理に適したファイルフォーマット(10.3節)

  • MapReduce アプリケーションが入出力として利用するキーバリューをバイト列として格納するためのファイル形式として、SequenceFile がある
    • ジョブとジョブの間での変換処理が減り、効率的
    • hdfs コマンドで簡単に中身を確認することもできる
    • 圧縮方法には、RECORD, BLOCK の2種類がある
      • RECORD: キーバリューのバリューを圧縮してファイルに出力する
      • BLOCK: ブロック単位で圧縮してファイルに出力する(推奨)

http://hadoop.apache.org/docs/r1.2.1/api/org/apache/hadoop/io/SequenceFile.html

一度に複数のファイルフォーマットを扱う(10.4節)

  • MultipleInputs
    • 複数のインプットに対し、それぞれ Mapper を定義する
MultipleInputs.addInputPath(job,
                            new Path("/input/txt"),
                            TextInputFormat.class,
                            FooMapper.class);                     // 独自定義する場合
MultipleInputs.addInputPath(job,
                            new Path("/input/seq"),
                            SequenceFileAsTextInputFormat.class,
                            BarMapper.class);                     // 独自定義する場合
  • MultipleOutputs
    • reduce メソッド内にて、出力を切り替えることが出来る
private MultipleOutputs mos;
...
public void reduce(Text key, Iterable<IntWritable> values, Context context) {
    ...
    mos.write("text", key, result);
    mos.write("seq",  key, result);
}

(いつ使うんだろう?(特にOutput))

分散キャッシュの利用(10.5章)

  • 入力データに付随して必要となるデータがある場合、各スレーブノードにファイルを配布することが可能
    • 複数タスクで共有されるため、不要なファイル転送を避けることができる
  • 指定方法は2つ
    • ジョブ起動時の hadoop オプションに -files オプションとして渡すことが可能(HDFS 上になくとも良い)
    • DistributedCache クラスが提供する API を利用する(HDFS 上にないと駄目)

Chapter 11 "HadoopStreaming"

HadoopStreaming では標準入出力を用い、慣れ親しんでいる言語で MapReduce アプリケーションが開発できる。性能面では Java での MapReduce アプリケーションに劣るが、処理時間が許容できる範囲であればメリットも大きい。

本章は HadoopStreaming の簡易さから内容が薄いため、応用方法について調べたことを中心に紹介する。

HadoopStreaming アプリケーションの構成(11.2節)

  • 構成
    • Map 処理を定義する実行可能ファイル
    • Reduce 処理を定義する実行可能ファイル
  • 処理の順序
    • Map 処理は、標準入力から入力セットを受け取る
    • Map 処理は、キーバリューを標準出力に出力する
    • Reduce 処理は、標準入力から Map 処理の出力を受け取る
    • Reduce 処理は、キーバリュー(処理結果)を標準出力に出力する
  • 注意点
    • Shuffle 処理がないため、Map 処理が出力したキーバリューを全て Reduce 処理で扱わなければいけない

HadoopStreaming の活用例(11.3節)

データのフィルタリング(11.3.1節)

Python や Ruby などのスクリプト言語が文字列処理に長けていることを利用する

R 言語との連携

本書では、R の Python バインディングである PypeR が紹介されている。

レコード間の関係を処理することができないため、Map / Reduce 処理中に R での統計処理を挟むのは難しそう。

以下まとめ

  • オプション色々あるけど、 -info オプションで確認できる
  • Combiner もスクリプトで渡せる
  • InputFormat, OutputFormat, Partitioner のクラスをデフォルトから変更できる
    • デフォルトは TextInputformat, TextOutputFormat
  • file オプションを利用することで、HDFS 上に無いローカルファイルを各ノードに配布可能
    • Map 処理、Reduce 処理のスクリプトに利用したり、各処理の共通データ/設定として利用
  • files オプションを利用することで、HDFS 上にあるファイルを各ノードに配布可能
  • libjars オプションで Jar ファイルを追加可能
    • 独自実装した InputFormat クラスを指定する場合などに利用

動かしてみる(Python)

mapper

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys

def wc_map(line):
    return [(key, 1) for key in re.split(r'\s', line.strip()) if key]

def output(records):
    for key, value in records:
        print '{0}\t{1}'.format(key,value)

for l in sys.stdin:
    output(wc_map(l))

reducer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import re
import sys

results = {}

def wc_reduce(line):
    key, value = re.split(r'\t', line.strip())
    if not key in results:
        results[key] = 0
    results[key] = results[key] + int(value)

def output(records):
    for key, value in records:
        print '{0}\t{1}'.format(key,value)

for l in sys.stdin:
    wc_reduce(l)
output(sorted(results.items()))

動かす

  • 失敗
$ sudo -u sampleuser hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.1.jar -input input -output sample_hadoopstreaming/output -mapper /home/sampleuser/python/mapper.py -reducer /home/sampleuser/python/reducer.py
...
14/04/15 18:11:11 ERROR streaming.StreamJob: Job not successful. Error: NA
14/04/15 18:11:11 INFO streaming.StreamJob: killJob...
Streaming Command Failed!
  • ログ確認
# less /var/log/hadoop-0.20-mapreduce/hadoop-hadoop-tasktracker-vagrant-centos65.vagrantup.com.log
Caused by: java.io.IOException: Cannot run program "/home/sampleuser/python/mapper.py": error=13, Permission denied
  • /home/sampleuser/python/mapper.py に実行権限が必要(フォルダにも)
$ chmod +x /home/sampleuser
  • 結果の取得
$ hdfs dfs -get sample_hadoopstreaming/output

残骸(Scheme)

処理系は Guile

mapper

(use-modules (ice-9 readline))
(activate-readline)

(define (split-writer line)
    (map (lambda (token)
             (display token)
             (display "\t")
             (display "1")
             (display "\n"))
         (string-tokenize line)))

; Execute
(let loop ((line (readline)))
    (if (not (eof-object? line))
        (begin (split-writer line)
               (loop (readline)))))

reducer

(use-modules (ice-9 readline))
(activate-readline)

(define ht (make-hash-table))

(define (counter line)
    (let* ((kv    (string-split line #\tab))
           (word  (list-ref kv 0))
           (count (list-ref kv 1)))
        (if (hash-ref ht word)
            (hash-set! ht word (+ (hash-ref ht word) count))
            (hash-set! ht word count))))

; Execute
(let loop ((line (readline)))
    (if (not (eof-object? line))
        (begin (counter line)
               (loop (readline)))))

; print
(hash-map ht (lambda (k v) (display k) (display "\t") (display v) (display "\n")))
  • readline がなぜかタブをスペースに変換してしまい、上手く動かない…

Chapter 12 "データフロー型処理言語 Pig"

Java や Streaming で MapReduce を利用する場合、ジョブの定義やデータ型(Writable)、MapReduce の順番、データの結合方法やキーの持ち方など、考慮すべき点が非常に多い。

MapReduce のフレームワークとして Apache Pig があり、MapReduce 特有の処理は Pig が受け持ち、開発者は処理のロジックに集中することができる。また、対話的に実行可能で、処理を試行錯誤しながら試すことが可能である。

Pig の特徴(12.2節)

メリット

  • Pig Latin 言語でデータの流れに着目して処理内容を定義できる
  • よく利用する処理について、あらかじめ機能が用意されている
    • データのソート、結合などは容易に記述可能
  • Java に比べて少ない記述量
    • より柔軟に記述したい場合は UDF(User Defined Function)が利用可能
  • MapReduce ジョブの構造を意識せずに処理内容を定義できる
    • 構文ごとに map/reduce メソッドの割り当てが行われるため、どちらで何の処理をするかを意識しなくてよい
  • 処理するタイミングでデータ意味付けする
    • ★意味するところを詳細に記述★

Hive との比較

「PigとHive何が違うの?」/ TechSketch http://tech-sketch.jp/2012/08/hadoop-pig.html

  • 性能面で不利
  • 処理は書きやすい
    • Hive だとサブクエリを利用することになり、複雑になりがち
    • GROUP した後、集合演算が必須ではない(集合値が手に入る)

Pig でのデータ管理(12.3節)

  • field
    • データ型によって表現される個々の値
    • 単純型(int, long, float, double, chararray, bytearray)と複合型(tuple, bag, map)で区別
  • tuple
    • field の集合。丸括弧で表す
    • (A, 1, 2)
    • (1,{(1,2,3)})
  • bag
    • tuple の集合。波括弧で表す
    • {(A, 1, 2), (B, 3, 5)}
  • map
    • key と value の形式でデータを扱う。key は chararray 型のみ
    • [ key1#value1, key2#value2 ]
  • リレーション(外部バッグ)
    • Pig Latin でデータを処理する単位で、tuple の集合。データ構造的には bag と同じ
    • RDBMS でいうと、tuple が行、リレーションが表 となる
# 外部バッグ
A = LOAD 'data' as (f1:int, f2:int, f3;int);
DUMP A;
(1,2,3)
(4,2,1)
(8,3,4)
(4,3,3)

# 各タプルの2つ目は内部バッグと呼んで区別する
# X 自体も外部バッグ
X = GROUP A BY f1;
DUMP X;
(1,{(1,2,3)})
(4,{(4,2,1),(4,3,3)})
(8,{(8,3,4)})

Pig での処理方法(12.4節)

要点のみピックアップする

データ加工方法(12.4.2節)

FOREACH GENERATE 文で特定のフィールドのみを対象とした変換が可能:

<出力リレーション> = FOREACH <入力リレーション> GENERATE <fields>
例)
data  = GROUP data BY group_id;
count = FOREACH data GENERATE group, COUNT(data); 
  • GENERATE の後に、更に FOREACH GENERATE を入れ子にすることが可能
  • AS を使ってフィールド名を付与することが可能

特定のフィールドでのデータ集約/ソート(12.4.4節)

  • 集約には GROUP, COGROUP が使える
  • PARTITION BY で Partitioner クラスの指定が可能(Mapper)
  • PARALLEL で集約処理自体の分散化が可能(Reducer)

データ結合方法(12.4.5節)

  • INNER JOIN, OUTER JOIN が利用可能

関数(12.5節)

要点のみピックアップする

ユーザーが定義する関数(12.5.3節)

  • Pig に含まれていない関数については、ユーザーが独自に作成可能(User Defined Function)
    • 評価関数を作成する場合は、EvalFunc を継承し、exec メソッドを実装
    • フィルタ関数を作成する場合は、FilterFunc を継承し、exec メソッドを実装(論理値を返す)
    • 集約関数を作成する場合は、EvalFunc を継承し、実装(詳細略)
  • UDF は Java だけでなく、Python, JavaScript, Ruby , Groovy でも定義可能

ユーザー定義関数の実装(12.8節)

  • exec メソッドの引数は Tuple オブジェクト
    • Tuple の何番目かを Tuple.get メソッドで取得する
    • Tuple.get は Object を返すので、キャストが必要
  • 分散キャッシュを利用した UDF も定義可能。アイテムコードからアイテム名への変換など(12.8.3節)

年齢を年代に変換する関数

public class CHECKAGE extends EvalFunc<Integer> {
    public CHECKAGE() {}

    @Override
    public Integer exec(Tuple input) throws IOException {
        if (input == null || input.size() == 0) {
            return null;
        }
        Integer age = (Integer) input.get(0);

        if ( age < 20 ) {
            return 1;
        } else if ( age >= 60 ) {
            return 3;
        } else {
            return 2;
        }
    }
}

Pig におけるデバッグ(12.9節)

4つの構文が用意されている

  • DESCRIBE
    • リレーション内のスキーマの確認
    • JOIN, FOREACH-GENERATE などでスキーマが変化するため、データ構造が変だと思ったときに叩いてみる
  • DUMP
    • データの中身が見たいときに使う
    • LIMIT 掛けないとコンソールが爆発するので注意(DUMP 自体に LIMIT が付けられる訳ではない)
    • データを読み込んだ直後の DUMP でも MapReduce 処理が走る
      • データが HDFS 上にあるため、集めるために MapReduce 処理を行わなければならない
  • EXPLAIN
    • 定義した構文が、どのように MapReduce ジョブに反映されるか確認するために使う
  • ILLUSTRATE
    • 指定したリレーションを生成するまでの処理を1ステップずつ実行し、その結果を標準出力に表示する

動かしてみる

トラッキングデータの集計

  • トラッキングデータ

| member | visit | page_view | date | ip_addr | site | page | referring_site | referring_page | referring_query | user_agent | os | app_key |

$ pig -x local

# 読み込み
grunt> TRACKING_DATA = LOAD 'tracking.log.csv' USING PigStorage(',') AS (member:chararray,visit:int,page_view:int,date:chararray,ip_addr:chararray,site:chararray,page:chararray,referring_site:chararray,referring_page:chararray,referring_query:chararray,user_agent:chararray,os:chararray,app_key:chararray);

# MapReduce ジョブが実行され、その後にダンプ結果が表示される 
grunt> DUMP TRACKING_DATA

# member でグループ化
grunt> MEMBERS = GROUP TRACKING_DATA BY member;

# member ごとに PV をカウント
grunt> MEMBER_PV = FOREACH MEMBERS GENERATE group AS member, SUM(TRACKING_DATA.page_view) AS count;

grunt> DESCRIBE MEMBER_PV;
MEMBER_PV: {member: chararray,count: long}

grunt> DUMP MEMBER_PV;
(fcecc720-29a7-yyyy-xxxx-d4bed9ef2705,1)
(fd8a4700-29a4-yyyy-xxxx-0022196ba192,3)
(fd8cdb10-29d0-yyyy-xxxx-d4bed9ef2705,15)

# ソート
grunt> MEMBER_PV = ORDER MEMBER_PV BY count DESC;
  • EXPLAIN が難しい

PV 数が多い上位ユーザーへのレコメンドアイテムの抽出

  • レコメンドデータ(p)

| member | session_id | impression_date | click_date | conversion_date | position | item_group | item_code | item_name | visit | page_view | ip_addr | site,page | area | weather | temperature |

grunt> MEMBER_PV_TOP10 = LIMIT MEMBER_PV 10;

# 読み込み
grunt> RECOMMEND_DATA = LOAD 'recommendation.p.log.csv' USING PigStorage(',') AS (member:chararray,session_id:chararray,impression_date:chararray,click_date:chararray,conversion_date:chararray,position:chararray,item_group:chararray,item_code:chararray,item_name:chararray,visit:int,page_view:int,ip_addr:chararray,site:chararray,page:chararray,area:chararray,weather:chararray,temperature:chararray);

# 左外部結合
grunt> TOP10_MEMBER_ITEMS = JOIN MEMBER_PV_TOP10 BY member LEFT, RECOMMEND_DATA BY member;

# 保存
grunt> STORE TOP10_MEMBER_ITEMS INTO 'top10_member_items' USING PigStorage(',');

# 終了
grunt> quit

Chapter 13 "SQL ライクインターフェイス Hive"

Pig と同じく MapReduce を容易に活用するための手段として、SQL ライクな言語を通じて操作するインターフェイスの Hive が開発された。

Hadoop と Hive(13.2節)

Hive のシステム構成(13.2.2節, 13.3.2節, 13.4.1節, 13.4.3節)

  • Hadoop のノードとは別建てで、Hadoop の JobClient のように動作する(Hive クライアント)
  • メタストアと呼ばれる Hive のメタ情報を管理するノードにアクセスし、そこで得た情報から Hadoop にアクセスする
    • メタストアは RDBMS で管理される(Cloudera 版の Hadoop だと Derby。MySQL, PostgreSQL も可)
  • Hive クライアント, メタストア, RDBMS の取り方から、3つのバリエーションがある(設定方法は 13.4.4節)
    • 組み込みモード:Hive クライアント、メタストア、RDBMS(Derby)が単一プロセスとして存在。お試し用。
    • ローカルモード:RDBMS を独立させて動作させるモード。複数のアクセスを同時に受け付けられるが、Hive クライアントは全て同じノード上に存在する必要がある
    • リモートモード:Hive クライアント・メタストア・RDBMS 全て独立させたモード。
  • Hive のデータは HDFS 上のファイルとして存在している:
    • hdfs://user/hive/warehouse/<データベース名>.db/<テーブル名>/<読み込んだファイル>
    • 例)/user/hive/warehouse/db1.db/table1/data.csv
  • Hive のデータにインデックスの概念はなく、このままだと毎回全走査する必要がある。これを防ぐために、パーティションという概念がある
    • 特定のキー値で、物理的に HDFS 上のファイルを分けてしまうもの:
    • hdfs://user/hive/warehouse/<データベース名>.db/<テーブル名>/<キー>=<値>/<読み込んだファイル>
    • HiveQL では通常通り、WHERE 句の条件等で指定すればよい

※ データベースには default が最初から存在しており、これを利用する場合はデータベースのパスが省略される

Hive と RDBMS の違い(13.2.1節)

  • オンライン処理に不向き(結局 Hadoop なので)
    • 大容量のデータを SQL ベースで低レイテンシで処理するためのエンジンの開発も近年では盛ん。Google の Dremel, Cloudera の Impala など。
  • トランザクション管理機能がない
    • 更新の概念はないが、並列実行して、1つでも失敗した場合に全体として失敗させる…などとする場合には、ユーザー自身でジョブの管理と不要な処理結果のクリーンアップが必要。
  • 行更新不可
    • 扱うのは HDFS 上のファイルのため

HiveQL(13.3節)

テーブル定義(13.3.3節)

CREATE TABLE 文でテーブル定義を行う。

  • データ型には TIMESTAMP があるため、日付も扱いやすい
  • PARTITIONED BY 句により、指定のカラム名でパーティショニング可能(13.3.4節)
  • CLUSTERED BY 句により、テーブルやパーティションに対してバケット数を指定して分割することが可能(13.4.1節)
  • STORED AS 句により、HDFS 上にファイルを格納する際のフォーマットを指定する。
    • RCFILE は列指向のため、指定されたカラムにだけアクセスできるようになる。大量の列をもち、一部のカラムにしかアクセスしない場合などに I/O 量を大幅に減らすことが出来るため、性能向上が見込める(13.4.2節)

データの投入とテーブルからのファイル出力(13.3.4節)

HDFS 上のファイルを読む場合は LOAD, Hive のテーブルから読む場合は INSERT を利用する。ここでの INSERT は SQL のように1行1行挿入するものではないため、利用する際には文法や挙動を確認する。

また、Hive のテーブルのデータを HDFS 上に書き出す場合も INSERT を利用する。

LOAD 文によるファイルシステム上からのデータ投入

LOAD DATA [LOCAL] INPATH '<ファイルパス>' [OVERWRITE] INTO TABLE <テーブル名> [PARTITION (col1=val1, col2=val2, ...)];
  • LOCAL 句を指定した場合は、ローカルファイルシステム上のファイルをロードする
  • HDFS 上のファイルを指定した場合は、move 扱いとなり、元のパスからは消える
  • OVERWRITE 句を指定した場合は置き換え、指定しない場合は追記となる
  • PARTITION 句を指定すると、特定パーティションに関するデータのみをロードする

INSERT TABLE 文によるテーブルからのデータの投入

INSERT {OVERWRITE, INTO} TABLE <テーブル名> [PARTITION (col1=val1, col2=val2, ...)] [IF NOT EXISTS] <SELECT句> FROM <FROM句>;
  • IF NOT EXISTS は、テーブル内にデータが存在しない場合にのみデータ投入する指定となる

マルチテーブルインサート

一つのテーブルから、複数のテーブルへ振り分ける処理。INSERT TABLE 文で、FROM 句を先頭にもってきて、その後に INSERT {OVERWRITE, INTO} を列挙する。

例)
FROM table_src
    INSERT INTO      TABLE table_dst1 SELECT col1, col2 WHERE col1 >= 0
    INSERT OVERWRITE TABLE table_dst1 SELECT col1, col2 WHERE col1 <  0;

動的パーティションインサート

同一テーブルの複数パーティションに、自動的に振り分けながらデータを投入する。

INSERT {OVERWRITE, INTO} TABLE <テーブル名> PARTITION (col1, col2, ...) <SELECT句> FROM <FROM句>
  • 文法としては、col1=val1 の =val1 を取り除いた形
  • 動的パーティションインサートはデフォルトでは off となっているため、オプションを有効にする
  • デフォルトでは、全てのパーティションではエラーとなる(strict)

INSERT DIRECTORY 文によるテーブルからのファイル出力

クエリの実行結果をファイルシステム中のディレクトリ内に出力させることができる。

INSERT OVERWRITE [LOCAL] DIRECTORY directory1 SELECT ... FROM ...
  • INSERT DIRECTORY もマルチテーブルインサート可能

SELECT 文(13.3.5節)

GROUP BY

通常は Reduce で処理されるが、Map で集約することで Reduce 処理への転送量を減らし、処理性能を向上できる場合がある。ただし Map 処理でのメモリ使用量が増加するため、注意が必要。

ORDER BY と SORT BY

ORDER BY で出力結果全体をソートする場合、Reducer の数が一つなる。hive.mapred.mode=strict とすると、ORDER BY 句に LIMIT が必須となる。(nonstrict とすれば LIMIT 句は不要だが、適当なタイミングで処理が打ち切られる)

SORT BY を指定すると、複数の Reducer 内で処理結果をソートさせられる。ただし、結果全体でソートされたものとはならないが…

CLUSTER BY と DISTRIBUTE BY

DISTRIBUTE BY を付与すると、カラム値ごとに同じ Reducer へ処理が渡る。SORT BY と組み合わせれば、DISTRIBUTE BY で指定したカラムにおいて、ソートすることが可能。

SELECT col1, col2 FROM table1 DISTRIBUTE BY col1 SORT BY col2;

DISTRIBUTE BY, SORT BY 共に同じカラムの場合は、CLUSTER BY としてまとめることができる。

効果的な Hive の使い方(13.4節)

Hive でのチューニング(13.4.1節)

パーティショニングとJOIN

JOIN した後の WHERE だと、全走査したあとに絞り込むことになり、パーティション指定されない:

SELECT a.val, b.val
FROM a
LEFT OUTER JOIN b
ON (a.key=b.key)
WHERE a.ds='2013-01-01' AND b.ds='2013-01-01'

ON 句で条件を指定すれば、パーティション指定となる:

SELECT a.val, b.val
FROM a
LEFT OUTER JOIN b
ON (a.key=b.key AND a.ds='2013-01-01' AND b.ds='2013-01-01')

Map Join

JOIN の処理は Reducer で行われるが、これを Map 処理でやってしまおうというもの(分散キャッシュの機構を利用)。Shuffle, Reduce 処理が省かれ、非常に効率が良くなる。

被 JOIN 対象のテーブルをハッシュテーブルに読み込むため、メモリに乗る程度の小さなテーブルでないとだめ。

SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a
JOIN b
ON a.key = b.key

/*+ MAPJOIN(b) */ はコメントではなく、明示的に Map Join を利用するヒント句。Hive 0.10 からは、ヒント句なしで Map Join を利用するように最適化されている。

Bucket Map Join と Sort Merge Join

メモリに乗る程度…という制約から、両テーブルがうまくバケット分割されている場合にも Map Join が利用可能。

具体的には:

  • 両テーブルが同じ?(★確認★)キーで CLUSTERED BY 指定されている
  • 両テーブルのバケット数が、どちらかの倍数になっている

更に、ソート済みでバケット数が両テーブルで一致している場合は、ソートマージジョインが利用可能。

Hive のインストール(6.5節)

cloudera のリポジトリから yum で入る

$ sudo -u hive hive
hive>

おまじない

$ sudo chown hive.hive derby.log

テーブル作成

CREATE TABLE tracking (
             member STRING,
             visit INT,
             page_view INT,
             date_str STRING,
             ip_addr STRING,
             site STRING,
             page STRING,
             referring_site STRING,
             referring_page STRING,
             referring_query STRING,
             user_agent STRING,
             os STRING,
             app_key STRING)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment