Created
March 22, 2012 14:44
-
-
Save tf0054/2158726 to your computer and use it in GitHub Desktop.
Hack#11
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.*; | |
| import java.util.*; | |
| import org.apache.hadoop.*; | |
| import org.apache.hadoop.util.*; | |
| import org.apache.hadoop.conf.*; | |
| import org.apache.hadoop.io.*; | |
| import org.apache.hadoop.fs.*; | |
| import org.apache.hadoop.mapreduce.lib.input.*; | |
| import org.apache.hadoop.mapreduce.lib.output.*; | |
| import org.apache.hadoop.mapreduce.*; | |
| public class InMapperCombinerWordCount extends Configured implements Tool{ | |
| private Configuration conf; | |
| private Job job; | |
| public InMapperCombinerWordCount()throws Exception{ | |
| conf = new Configuration(); | |
| job = new Job(conf); | |
| } | |
| private static class InMapperCombinerWordCountMapper extends Mapper<LongWritable , Text , Text , IntWritable>{ | |
| private HashMap<String , IntWritable> hashMap = new HashMap<String , IntWritable>(); | |
| private static final int MAP_CAPACITY_LIMIT = 100; //100件格納したら連想配列の中身を書き出す。 | |
| @Override | |
| public void map(LongWritable ikey , Text ival , Context context)throws IOException , InterruptedException{ | |
| String line = ival.toString(); | |
| StringTokenizer tokenizer = new StringTokenizer(line , " ,.;:!\"'`()&?<>|\\-%$#*+[]"); | |
| while(tokenizer.hasMoreTokens()){ | |
| String token = tokenizer.nextToken(); | |
| if(hashMap.containsKey(token)){ | |
| IntWritable value = hashMap.get(token); | |
| value.set(value.get() + 1); | |
| if(hashMap.size() > MAP_CAPACITY_LIMIT){ | |
| flush(context); | |
| } | |
| } | |
| else{ | |
| hashMap.put(token , new IntWritable(1)); | |
| } | |
| } | |
| } | |
| @Override | |
| public void cleanup(Context context)throws IOException , InterruptedException{ | |
| if(!hashMap.isEmpty()){ | |
| flush(context); | |
| } | |
| } | |
| public void flush(Context context)throws IOException , InterruptedException{ | |
| Iterator<String> iterator = hashMap.keySet().iterator(); | |
| while(iterator.hasNext()){ | |
| String key = iterator.next(); | |
| IntWritable value = hashMap.get(key); | |
| context.write(new Text(key) , value); | |
| } | |
| hashMap.clear(); | |
| } | |
| } | |
| private static class InMapperCombinerWordCountReducer extends Reducer<Text , IntWritable , Text , IntWritable>{ | |
| @Override | |
| public void reduce(Text ikey , Iterable<IntWritable> ivals , Context context)throws IOException , InterruptedException{ | |
| Iterator<IntWritable> iterator = ivals.iterator(); | |
| IntWritable sum = new IntWritable(0); | |
| while(iterator.hasNext()){ | |
| sum.set(sum.get() + iterator.next().get()); | |
| } | |
| context.write(ikey , sum); | |
| } | |
| } | |
| public void init(String[] args)throws IOException{ | |
| job.setJobName("InMapperCombinerWordCount"); | |
| job.setJarByClass(InMapperCombinerWordCount.class); | |
| job.setInputFormatClass(TextInputFormat.class); | |
| job.setOutputFormatClass(TextOutputFormat.class); | |
| //プログラムに最初に与えられた引数が示すファイルの中身をワードカウントする | |
| TextInputFormat.addInputPath(job , new Path(args[0])); | |
| //HDFS上の出力先を、プログラムに渡す二番目の引数で指定する | |
| TextOutputFormat.setOutputPath(job , new Path(args[1])); | |
| job.setMapperClass(InMapperCombinerWordCountMapper.class); | |
| job.setReducerClass(InMapperCombinerWordCountReducer.class); | |
| job.setOutputKeyClass(Text.class); | |
| job.setOutputValueClass(IntWritable.class); | |
| } | |
| public int run(String[] args)throws Exception{ | |
| init(args); | |
| return job.waitForCompletion(true) ? 0 : 1; | |
| } | |
| public static void main(String[] args)throws Exception{ | |
| int returnCode = 0; | |
| returnCode = ToolRunner.run(new InMapperCombinerWordCount() , args); | |
| System.exit(returnCode); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment