Last active
August 29, 2015 14:14
-
-
Save soulmachine/a8130aa4983f2036b3bf to your computer and use it in GitHub Desktop.
Polished code based on http://wiki.apache.org/hadoop/WordCount
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package me.soulmachine; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import java.io.IOException; | |
import java.util.StringTokenizer; | |
public class WordCount extends Configured implements Tool { | |
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { | |
private static final IntWritable ONE = new IntWritable(1); | |
private final transient Text word = new Text(); | |
@Override public void map(final LongWritable key, final Text value, final Context context) | |
throws IOException, InterruptedException { | |
final String line = value.toString(); | |
final StringTokenizer tokenizer = new StringTokenizer(line); | |
while (tokenizer.hasMoreTokens()) { | |
word.set(tokenizer.nextToken()); | |
context.write(word, ONE); | |
} | |
} | |
} | |
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { | |
@Override | |
public void reduce(final Text key, final Iterable<IntWritable> values, final Context context) | |
throws IOException, InterruptedException { | |
int sum = 0; | |
for (final IntWritable val : values) { | |
sum += val.get(); | |
} | |
context.write(key, new IntWritable(sum)); | |
} | |
} | |
@Override public int run(final String[] args) throws Exception { | |
final Configuration conf = this.getConf(); | |
final Job job = Job.getInstance(conf, "Word Count"); | |
job.setJarByClass(WordCount.class); | |
job.setMapperClass(MyMapper.class); | |
job.setReducerClass(MyReducer.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setInputFormatClass(TextInputFormat.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
return job.waitForCompletion(true) ? 0 : 1; | |
} | |
public static void main(final String[] args) throws Exception { | |
final int returnCode = ToolRunner.run(new Configuration(), new WordCount(), args); | |
System.exit(returnCode); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment