Skip to content

Instantly share code, notes, and snippets.

@tf0054
Created March 22, 2012 14:44
Show Gist options
  • Save tf0054/2158726 to your computer and use it in GitHub Desktop.
Save tf0054/2158726 to your computer and use it in GitHub Desktop.
Hack#11
import java.io.*;
import java.util.*;
import org.apache.hadoop.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.mapreduce.*;
public class InMapperCombinerWordCount extends Configured implements Tool{
private Configuration conf;
private Job job;
public InMapperCombinerWordCount()throws Exception{
conf = new Configuration();
job = new Job(conf);
}
private static class InMapperCombinerWordCountMapper extends Mapper<LongWritable , Text , Text , IntWritable>{
private HashMap<String , IntWritable> hashMap = new HashMap<String , IntWritable>();
private static final int MAP_CAPACITY_LIMIT = 100; //100件格納したら連想配列の中身を書き出す。
@Override
public void map(LongWritable ikey , Text ival , Context context)throws IOException , InterruptedException{
String line = ival.toString();
StringTokenizer tokenizer = new StringTokenizer(line , " ,.;:!\"'`()&?<>|\\-%$#*+[]");
while(tokenizer.hasMoreTokens()){
String token = tokenizer.nextToken();
if(hashMap.containsKey(token)){
IntWritable value = hashMap.get(token);
value.set(value.get() + 1);
if(hashMap.size() > MAP_CAPACITY_LIMIT){
flush(context);
}
}
else{
hashMap.put(token , new IntWritable(1));
}
}
}
@Override
public void cleanup(Context context)throws IOException , InterruptedException{
if(!hashMap.isEmpty()){
flush(context);
}
}
public void flush(Context context)throws IOException , InterruptedException{
Iterator<String> iterator = hashMap.keySet().iterator();
while(iterator.hasNext()){
String key = iterator.next();
IntWritable value = hashMap.get(key);
context.write(new Text(key) , value);
}
hashMap.clear();
}
}
private static class InMapperCombinerWordCountReducer extends Reducer<Text , IntWritable , Text , IntWritable>{
@Override
public void reduce(Text ikey , Iterable<IntWritable> ivals , Context context)throws IOException , InterruptedException{
Iterator<IntWritable> iterator = ivals.iterator();
IntWritable sum = new IntWritable(0);
while(iterator.hasNext()){
sum.set(sum.get() + iterator.next().get());
}
context.write(ikey , sum);
}
}
public void init(String[] args)throws IOException{
job.setJobName("InMapperCombinerWordCount");
job.setJarByClass(InMapperCombinerWordCount.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
//プログラムに最初に与えられた引数が示すファイルの中身をワードカウントする
TextInputFormat.addInputPath(job , new Path(args[0]));
//HDFS上の出力先を、プログラムに渡す二番目の引数で指定する
TextOutputFormat.setOutputPath(job , new Path(args[1]));
job.setMapperClass(InMapperCombinerWordCountMapper.class);
job.setReducerClass(InMapperCombinerWordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
}
public int run(String[] args)throws Exception{
init(args);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args)throws Exception{
int returnCode = 0;
returnCode = ToolRunner.run(new InMapperCombinerWordCount() , args);
System.exit(returnCode);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment