Last active
May 2, 2017 02:11
-
-
Save brun0xff/6320bbe0f9e4f1248cc2d21192e98150 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Bespin: reference implementations of "big data" algorithms | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
//package io.bespin.java.mapreduce.wordcount; | |
package br.edu.ufam.brun0xff; | |
import io.bespin.java.util.Tokenizer; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.conf.Configured; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.io.LongWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; | |
import org.apache.hadoop.util.Tool; | |
import org.apache.hadoop.util.ToolRunner; | |
import org.apache.log4j.Logger; | |
import org.kohsuke.args4j.CmdLineException; | |
import org.kohsuke.args4j.CmdLineParser; | |
import org.kohsuke.args4j.Option; | |
import org.kohsuke.args4j.ParserProperties; | |
import java.io.IOException; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.Map; | |
/** | |
* Simple word count demo. | |
*/ | |
public class WordCount extends Configured implements Tool { | |
private static final Logger LOG = Logger.getLogger(WordCount.class); | |
// Mapper: emits (token, 1) for every word occurrence. | |
public static final class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { | |
// Reuse objects to save overhead of object creation. | |
private static final IntWritable ONE = new IntWritable(1); | |
private static final Text WORD = new Text(); | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
for (String word : Tokenizer.tokenize(value.toString())) { | |
WORD.set(word); | |
context.write(WORD, ONE); | |
} | |
} | |
} | |
public static final class MyMapperIMC extends Mapper<LongWritable, Text, Text, IntWritable> { | |
private Map<String, Integer> counts; | |
@Override | |
public void setup(Context context) throws IOException, InterruptedException { | |
counts = new HashMap<>(); | |
} | |
@Override | |
public void map(LongWritable key, Text value, Context context) | |
throws IOException, InterruptedException { | |
for (String word : Tokenizer.tokenize(value.toString())) { | |
if (counts.containsKey(word)) { | |
counts.put(word, counts.get(word)+1); | |
} else { | |
counts.put(word, 1); | |
} | |
} | |
} | |
@Override | |
public void cleanup(Context context) throws IOException, InterruptedException { | |
IntWritable cnt = new IntWritable(); | |
Text token = new Text(); | |
for (Map.Entry<String, Integer> entry : counts.entrySet()) { | |
token.set(entry.getKey()); | |
cnt.set(entry.getValue()); | |
context.write(token, cnt); | |
} | |
} | |
} | |
// Reducer: sums up all the counts. | |
public static final class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { | |
// Reuse objects. | |
private static final IntWritable SUM = new IntWritable(); | |
@Override | |
public void reduce(Text key, Iterable<IntWritable> values, Context context) | |
throws IOException, InterruptedException { | |
// Sum up values. | |
Iterator<IntWritable> iter = values.iterator(); | |
int sum = 0; | |
while (iter.hasNext()) { | |
sum += iter.next().get(); | |
} | |
SUM.set(sum); | |
context.write(key, SUM); | |
} | |
} | |
/** | |
* Creates an instance of this tool. | |
*/ | |
private WordCount() {} | |
private static final class Args { | |
@Option(name = "-input", metaVar = "[path]", required = true, usage = "input path") | |
String input; | |
@Option(name = "-output", metaVar = "[path]", required = true, usage = "output path") | |
String output; | |
@Option(name = "-reducers", metaVar = "[num]", usage = "number of reducers") | |
int numReducers = 1; | |
@Option(name = "-imc", usage = "use in-mapper combining") | |
boolean imc = false; | |
} | |
/** | |
* Runs this tool. | |
*/ | |
@Override | |
public int run(String[] argv) throws Exception { | |
final Args args = new Args(); | |
CmdLineParser parser = new CmdLineParser(args, ParserProperties.defaults().withUsageWidth(100)); | |
try { | |
parser.parseArgument(argv); | |
} catch (CmdLineException e) { | |
System.err.println(e.getMessage()); | |
parser.printUsage(System.err); | |
return -1; | |
} | |
LOG.info("Tool: " + WordCount.class.getSimpleName()); | |
LOG.info(" - input path: " + args.input); | |
LOG.info(" - output path: " + args.output); | |
LOG.info(" - number of reducers: " + args.numReducers); | |
LOG.info(" - use in-mapper combining: " + args.imc); | |
Configuration conf = getConf(); | |
Job job = Job.getInstance(conf); | |
job.setJobName(WordCount.class.getSimpleName()); | |
job.setJarByClass(WordCount.class); | |
job.setNumReduceTasks(args.numReducers); | |
FileInputFormat.setInputPaths(job, new Path(args.input)); | |
FileOutputFormat.setOutputPath(job, new Path(args.output)); | |
job.setMapOutputKeyClass(Text.class); | |
job.setMapOutputValueClass(IntWritable.class); | |
job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(IntWritable.class); | |
job.setOutputFormatClass(TextOutputFormat.class); | |
job.setMapperClass(args.imc ? MyMapperIMC.class : MyMapper.class); | |
job.setCombinerClass(MyReducer.class); | |
job.setReducerClass(MyReducer.class); | |
// Delete the output directory if it exists already. | |
Path outputDir = new Path(args.output); | |
FileSystem.get(conf).delete(outputDir, true); | |
long startTime = System.currentTimeMillis(); | |
job.waitForCompletion(true); | |
LOG.info("Job Finished in " + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds"); | |
return 0; | |
} | |
/** | |
* Dispatches command-line arguments to the tool via the {@code ToolRunner}. | |
* | |
* @param args command-line arguments | |
* @throws Exception if tool encounters an exception | |
*/ | |
public static void main(String[] args) throws Exception { | |
ToolRunner.run(new WordCount(), args); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment