Skip to content

Instantly share code, notes, and snippets.

@hishidama
Created December 15, 2012 14:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hishidama/4295818 to your computer and use it in GitHub Desktop.
Save hishidama/4295818 to your computer and use it in GitHub Desktop.
irofさんによってmzpさんを量産するMapReduceプログラム
このエントリーは「いろふ Advent Calendar」(http://atnd.org/events/34079)ではなく、
Hadoopアドベントカレンダー2012(http://www.zusaar.com/event/437104)の14日目です。
Hadoopの分散キャッシュ(http://www.ne.jp/asahi/hishidama/home/tech/apache/hadoop/DistributedCache.html)のサンプルを作ってみました。
------------------------
サンプルプログラムの仕様
------------------------
概要: irofさんにmzpさん(雪うさぎ)を量産していただきます。
(→なんでこうなったかの経緯 http://blog.goo.ne.jp/hishidama/e/6f540e8e0cdd3978d4f63115695f1da2)
入力データは以下のような気象観測データ(CSVファイル)とします。
+---------------------------------------+-------------------------------+
|第1カラム |第2カラム |
+---------------------------------------+-------------------------------+
| 気象を観測した場所のコード(観測場所)| 積雪量(単位:センチメートル)|
+---------------------------------------+-------------------------------+
雪があればmzpさん(雪うさぎ)を作ることが出来ます。
ここでは、観測地点1箇所につき、積雪量1cm当たり10個のmzpさんが作れるものとします。
観測データは大量にあるはずなので、分散処理します。
irofさんは全国に沢山いるので、地域(観測場所)によって各irofさんに分担していただきます。
irofさん一覧は、いろふ図鑑(http://togetter.com/li/414143)から生成した、以下のようなCSVファイルとします。
+------------------------+----------------+
|第1カラム |第2カラム |
+------------------------+----------------+
| irofさんのアカウント名 | irofさんの名前 |
+------------------------+----------------+
最終的に、以下のような出力(TSVファイル)が得られます。
+----------------+-----------------------+
|第1カラム |第2カラム |
+----------------+-----------------------+
| irofさんの名前 | 作られたmzpさんの個数 |
+----------------+-----------------------+
package example;
import java.net.URI;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* irofさんによってmzpさんを量産するHadoopジョブ.
*/
public class CreateMzpByIrofJob extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int r = ToolRunner.run(new CreateMzpByIrofJob(), args);
System.exit(r);
}
@Override
public int run(String[] args) throws Exception {
Path input = new Path(args[0]);
Path output = new Path(args[1]);
// いろふ図鑑 http://togetter.com/li/414143
String irof = args[2];
Job job = new Job(getConf(), "create-mzp-by-irof");
job.setJarByClass(getClass());
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(CreateMzpMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, input);
FileOutputFormat.setOutputPath(job, output);
DistributedCache.addCacheFile(new URI(irof), job.getConfiguration());
boolean succeeded = job.waitForCompletion(true);
return succeeded ? 0 : 1;
}
}
package example;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class CreateMzpMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private List<String> irofList = new ArrayList<String>(20);
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
URI[] files = DistributedCache
.getCacheFiles(context.getConfiguration());
URI irof = files[0]; // 分散キャッシュを1つしか使っていないので、0番を決め打ち
Path path = new Path(irof);
FileSystem fs = path.getFileSystem(context.getConfiguration());
FSDataInputStream is = fs.open(path);
BufferedReader br = new BufferedReader(new InputStreamReader(is));
try {
for (;;) {
String s = br.readLine();
if (s == null) {
break;
}
String[] ss = s.split(",");
irofList.add(ss[1].trim());
}
} finally {
br.close();
}
}
private Text irof = new Text();
private IntWritable mzp = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] ss = value.toString().split(",");
String 観測場所 = ss[0].trim();
int 積雪量 = Integer.parseInt(ss[1].trim());
// mzp作成を担当するirofの決定
irof.set(irofList.get((観測場所.hashCode() & Integer.MAX_VALUE) % irofList.size()));
// 作成できるmzpの個数
mzp.set(積雪量 * 10);
context.write(irof, mzp);
}
}
We can make this file beautiful and searchable if this error is corrected: It looks like row 4 should actually have 2 columns, instead of 1. in line 3.
pos1,1
pos2,2
pos3,3
irof いろふ
ayato_p あやろふ
NoriyukiMizuno みずのりろふ
Posaune まえかわひろふ
backpaper0 たいちろふ
inda_re いんだろふ
orange_clover おれんじイロフーバー
yukieen ゆきろふ
mike_neck 眠いろふ
toru_inoue さついろふ
tan_go たんごろふ
kimukou きむころふ
irof_history irof_history
mzp みずぴろふ
zer0_u ぜろふ
bleis ぶれいろふ
shuji_w6e しゅうじろふ
sue445 くりろふ
k_Okamoto おかもろふ
megascus ゆとろふ
We can make this file beautiful and searchable if this error is corrected: No tabs found in this TSV file in line 0.
irof_history 2370
あやろふ 2230
いろふ 2250
いんだろふ 2100
おかもろふ 2350
おれんじイロフーバー 2070
きむころふ 2250
くりろふ 2430
さついろふ 2130
しゅうじろふ 2410
ぜろふ 2450
たいちろふ 2130
たんごろふ 2230
ぶれいろふ 2480
まえかわひろふ 2070
みずのりろふ 2110
みずぴろふ 2410
ゆきろふ 2050
ゆとろふ 2350
眠いろふ 2130
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment