Skip to content

Instantly share code, notes, and snippets.

@sinchii
Created March 24, 2012 06:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sinchii/2179126 to your computer and use it in GitHub Desktop.
Save sinchii/2179126 to your computer and use it in GitHub Desktop.
sample code @ OSC2012 Tokyo/Spring
1. record file (record1.txt, record2.txt)
最初にHDFS上に格納する
2. master file (master.txt)
PigとHive利用時にはHDFS上に格納する
Java版はローカルディスクに置いておく
3. OSCSample.java
Java版ソースコード
4. Hive用ファイル
Hiveテーブル作成用スクリプト(hive-createtable.hiveql)
Hiveデモ用スクリプト(hive-demo.hiveql)
5. Pig用ファイル
Pigデモファイル (pig-demo.pig)
create table if not exists record (
user string,
team string,
address string,
score int
)
row format delimited
fields terminated by ','
stored as TEXTFILE;
create table if not exists master (
address string,
weight int
)
row format delimited
fields terminated by ','
stored as TEXTFILE;
select
team,
SUM(score * weight)
from (
select
r.team,
r.score,
if (m.weight IS NULL, 0, m.weight) weight
from
record r
left outer join
master m
on
r.address = m.address
) tmp
group by
team
;
chiba,30
tokyo,1
hiroshima,10
saitama,5
tokorozawa,1
fukuoka,20
aichi,12
urawa,2
package com.example.hadoop;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Scanner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class OSCSample extends Configured implements Tool {
private final static String master = "master";
@Override
public int run(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: OSCSample inputPath outputPath masterfile");
return -1;
}
// マスターファイルのパス情報取得
Path mPath = new Path(args[2]);
// Hadoopの設定情報取得
Configuration conf = getConf();
// ファイルシステム情報取得
FileSystem fs = FileSystem.get(conf);
// マスターファイルを格納
if (fs.exists(mPath)) {
fs.delete(mPath, true);
}
fs.copyFromLocalFile(mPath, mPath);
// DistributedCacheの設定
URI uri = new URI(mPath.toString() + "#" + master);
DistributedCache.addCacheFile(uri, conf);
DistributedCache.createSymlink(conf);
// ジョブ情報設定
Job job = new Job(conf);
// ジョブ名設定
job.setJobName(getClass().getName());
// ジョブ実行用クラス設定
job.setJarByClass(getClass());
// 入力パス設定
FileInputFormat.setInputPaths(job, args[0]);
job.setInputFormatClass(TextInputFormat.class);
// Mapクラス設定
job.setMapperClass(OSCSampleMapper.class);
// Map出力KV設定
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// Reduceクラス設定
job.setReducerClass(OSCSampleReducer.class);
// Reduce処理数設定
job.setNumReduceTasks(1);
// 出力パス設定
Path outputPath = new Path(args[1]);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(TextOutputFormat.class);
// 出力KV設定
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// ジョブ実行
return (job.waitForCompletion(true)) ? 0 : 1;
}
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int result = ToolRunner.run(new Configuration(), new OSCSample(), args);
System.exit(result);
}
public static class OSCSampleMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
private HashMap<String, Integer> map = new HashMap<String, Integer>();
private final static String master = "master";
private final static int ID_POS = 0;
private final static int VAL_POS = 1;
/**
* DistributedCacheによりマスターデータをHashMapに格納
*/
@Override
public void setup(Context context)
throws IOException, InterruptedException {
Scanner scan = new Scanner(new File(master));
scan.useDelimiter("\n");
while (scan.hasNext()) {
// マスターデータは、Address,Rate である
String[] tmp = scan.next().split(",");
if (tmp.length == 2) {
map.put(tmp[ID_POS], Integer.valueOf(tmp[VAL_POS]));
}
}
}
private final static int TEAM_POS = 1;
private final static int ADDRESS_POS = 2;
private final static int SCORE_POS = 3;
private Text newKey = new Text();
private LongWritable newValue = new LongWritable();
/**
* マスターデータと入力データを紐付けるmap処理
*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// Mapの入力は、Name,Team,Address,Score である
String[] tmp = value.toString().split(",");
// ADDRESS_POSがマスターデータにあれば、、、加算
if (map.containsKey(tmp[ADDRESS_POS])) {
// newValue <- SCORE * RATE
newValue.set(Integer.parseInt(tmp[SCORE_POS]) * map.get(tmp[ADDRESS_POS]));
} else {
// 無ければ、、、残念
newValue.set(0);
}
newKey.set(tmp[TEAM_POS]);
context.write(newKey, newValue);
}
}
/**
* Keyに紐づく値を合算するReducer
* @author yam
*
*/
public static class OSCSampleReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private LongWritable newValue = new LongWritable();
/**
* map処理結果よりKeyで値を合算
*/
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long l = 0;
for (LongWritable v : values) {
l += v.get();
}
newValue.set(l);
context.write(key, newValue);
}
}
}
R = LOAD '/user/yam/input/' USING PigStorage(',') AS (user:chararray, team:chararray, address:chararray, score:int);
M = LOAD '/user/yam/pig-master.txt' USING PigStorage(',') AS (address:chararray, weight:int);
J = JOIN R by address LEFT OUTER, M BY address USING 'replicated';
F = FOREACH J GENERATE $1, ($5 is not NULL ? $3*$5 : 0);
G = GROUP F BY $0;
RESULT = FOREACH G GENERATE group, SUM(F.$1);
DUMP RESULT;
user001,marines,chiba,20
user002,lions,saitama,50
user003,carp,hiroshima,10
user004,hawks,fukuoka,5
user007,dragons,aichi,30
user002,giants,tokyo,100
user005,marines,chiba,30
user003,marines,chiba,100
user010,giants,tokyo,20
user006,carp,hiroshima,20
user110,marines,sendai,1
user124,fighters,kamagaya,5
user300,marines,urawa,1
user940,carp,hakata,20
user777,dragons,aichi,30
user220,hawks,tokyo,200
user151,dragons,nagoya,30
user261,lions,tokorozawa,1000
user246,marines,chiba,10
user889,dragons,aichi,50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment