Created
March 24, 2012 06:45
-
-
Save sinchii/2179126 to your computer and use it in GitHub Desktop.
sample code @ OSC2012 Tokyo/Spring
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
1. record file (record1.txt, record2.txt) | |
最初にHDFS上に格納する | |
2. master file (master.txt) | |
PigとHive利用時にはHDFS上に格納する | |
Java版はローカルディスクに置いておく | |
3. OSCSample.java | |
Java版ソースコード | |
4. Hive用ファイル | |
Hiveテーブル作成用スクリプト(hive-createtable.hiveql) | |
Hiveデモ用スクリプト(hive-demo.hiveql) | |
5. Pig用ファイル | |
Pigデモファイル (pig-demo.pig) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
create table if not exists record ( | |
user string, | |
team string, | |
address string, | |
score int | |
) | |
row format delimited | |
fields terminated by ',' | |
stored as TEXTFILE; | |
create table if not exists master ( | |
address string, | |
weight int | |
) | |
row format delimited | |
fields terminated by ',' | |
stored as TEXTFILE; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
select | |
team, | |
SUM(score * weight) | |
from ( | |
select | |
r.team, | |
r.score, | |
if (m.weight IS NULL, 0, m.weight) weight | |
from | |
record r | |
left outer join | |
master m | |
on | |
r.address = m.address | |
) tmp | |
group by | |
team | |
; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
chiba,30 | |
tokyo,1 | |
hiroshima,10 | |
saitama,5 | |
tokorozawa,1 | |
fukuoka,20 | |
aichi,12 | |
urawa,2 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.hadoop; | |
import java.io.File; | |
import java.io.IOException; | |
import java.net.URI; | |
import java.util.HashMap; | |
import java.util.Scanner; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
public class OSCSample extends Configured implements Tool { | |
private final static String master = "master"; | |
@Override | |
public int run(String[] args) throws Exception { | |
if (args.length != 3) { | |
System.err.println("Usage: OSCSample inputPath outputPath masterfile"); | |
return -1; | |
} | |
// マスターファイルのパス情報取得 | |
Path mPath = new Path(args[2]); | |
// Hadoopの設定情報取得 | |
Configuration conf = getConf(); | |
// ファイルシステム情報取得 | |
FileSystem fs = FileSystem.get(conf); | |
// マスターファイルを格納 | |
if (fs.exists(mPath)) { | |
fs.delete(mPath, true); | |
} | |
fs.copyFromLocalFile(mPath, mPath); | |
// DistributedCacheの設定 | |
URI uri = new URI(mPath.toString() + "#" + master); | |
DistributedCache.addCacheFile(uri, conf); | |
DistributedCache.createSymlink(conf); | |
// ジョブ情報設定 | |
Job job = new Job(conf); | |
// ジョブ名設定 | |
job.setJobName(getClass().getName()); | |
// ジョブ実行用クラス設定 | |
job.setJarByClass(getClass()); | |
// 入力パス設定 | |
FileInputFormat.setInputPaths(job, args[0]); | |
job.setInputFormatClass(TextInputFormat.class); | |
// Mapクラス設定 | |
job.setMapperClass(OSCSampleMapper.class); | |
// Map出力KV設定 | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(LongWritable.class); | |
// Reduceクラス設定 | |
job.setReducerClass(OSCSampleReducer.class); | |
// Reduce処理数設定 | |
job.setNumReduceTasks(1); | |
// 出力パス設定 | |
Path outputPath = new Path(args[1]); | |
if (fs.exists(outputPath)) { | |
fs.delete(outputPath, true); | |
} | |
FileOutputFormat.setOutputPath(job, outputPath); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
// 出力KV設定 | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(LongWritable.class); | |
// ジョブ実行 | |
return (job.waitForCompletion(true)) ? 0 : 1; | |
} | |
/** | |
* @param args | |
* @throws Exception | |
*/ | |
public static void main(String[] args) throws Exception { | |
int result = ToolRunner.run(new Configuration(), new OSCSample(), args); | |
System.exit(result); | |
} | |
public static class OSCSampleMapper extends Mapper<LongWritable, Text, Text, LongWritable> { | |
private HashMap<String, Integer> map = new HashMap<String, Integer>(); | |
private final static String master = "master"; | |
private final static int ID_POS = 0; | |
private final static int VAL_POS = 1; | |
/** | |
* DistributedCacheによりマスターデータをHashMapに格納 | |
*/ | |
@Override | |
public void setup(Context context) | |
throws IOException, InterruptedException { | |
Scanner scan = new Scanner(new File(master)); | |
scan.useDelimiter("\n"); | |
while (scan.hasNext()) { | |
// マスターデータは、Address,Rate である | |
String[] tmp = scan.next().split(","); | |
if (tmp.length == 2) { | |
map.put(tmp[ID_POS], Integer.valueOf(tmp[VAL_POS])); | |
} | |
} | |
} | |
private final static int TEAM_POS = 1; | |
private final static int ADDRESS_POS = 2; | |
private final static int SCORE_POS = 3; | |
private Text newKey = new Text(); | |
private LongWritable newValue = new LongWritable(); | |
/** | |
* マスターデータと入力データを紐付けるmap処理 | |
*/ | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
// Mapの入力は、Name,Team,Address,Score である | |
String[] tmp = value.toString().split(","); | |
// ADDRESS_POSがマスターデータにあれば、、、加算 | |
if (map.containsKey(tmp[ADDRESS_POS])) { | |
// newValue <- SCORE * RATE | |
newValue.set(Integer.parseInt(tmp[SCORE_POS]) * map.get(tmp[ADDRESS_POS])); | |
} else { | |
// 無ければ、、、残念 | |
newValue.set(0); | |
} | |
newKey.set(tmp[TEAM_POS]); | |
context.write(newKey, newValue); | |
} | |
} | |
/** | |
* Keyに紐づく値を合算するReducer | |
* @author yam | |
* | |
*/ | |
public static class OSCSampleReducer extends Reducer<Text, LongWritable, Text, LongWritable> { | |
private LongWritable newValue = new LongWritable(); | |
/** | |
* map処理結果よりKeyで値を合算 | |
*/ | |
@Override | |
public void reduce(Text key, Iterable<LongWritable> values, Context context) | |
throws IOException, InterruptedException { | |
long l = 0; | |
for (LongWritable v : values) { | |
l += v.get(); | |
} | |
newValue.set(l); | |
context.write(key, newValue); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
R = LOAD '/user/yam/input/' USING PigStorage(',') AS (user:chararray, team:chararray, address:chararray, score:int); | |
M = LOAD '/user/yam/pig-master.txt' USING PigStorage(',') AS (address:chararray, weight:int); | |
J = JOIN R by address LEFT OUTER, M BY address USING 'replicated'; | |
F = FOREACH J GENERATE $1, ($5 is not NULL ? $3*$5 : 0); | |
G = GROUP F BY $0; | |
RESULT = FOREACH G GENERATE group, SUM(F.$1); | |
DUMP RESULT; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
user001,marines,chiba,20 | |
user002,lions,saitama,50 | |
user003,carp,hiroshima,10 | |
user004,hawks,fukuoka,5 | |
user007,dragons,aichi,30 | |
user002,giants,tokyo,100 | |
user005,marines,chiba,30 | |
user003,marines,chiba,100 | |
user010,giants,tokyo,20 | |
user006,carp,hiroshima,20 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
user110,marines,sendai,1 | |
user124,fighters,kamagaya,5 | |
user300,marines,urawa,1 | |
user940,carp,hakata,20 | |
user777,dragons,aichi,30 | |
user220,hawks,tokyo,200 | |
user151,dragons,nagoya,30 | |
user261,lions,tokorozawa,1000 | |
user246,marines,chiba,10 | |
user889,dragons,aichi,50 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment