Skip to content

Instantly share code, notes, and snippets.

@jeff303
Created December 1, 2017 21:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeff303/52ae74d4da6eb3140642d0a8d37e62c8 to your computer and use it in GitHub Desktop.
Save jeff303/52ae74d4da6eb3140642d0a8d37e62c8 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.Iterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/** Tutorial1 */
public class Tutorial1 extends Configured implements Tool {
// The Mapper
public static class Map extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {
// Log levels to search for
private static final Pattern pattern =
Pattern.compile("(TRACE)|(DEBUG)|(INFO)|(WARN)|(ERROR)|(FATAL)");
private static final IntWritable accumulator = new IntWritable(1);
private Text logLevel = new Text();
public void map(
LongWritable key,
Text value,
OutputCollector<Text, IntWritable> collector,
Reporter reporter)
throws IOException {
// split on space, '[', and ']'
final String[] tokens = value.toString().split("[\\[\\]]");
if (tokens != null) {
// now find the log level token
for (final String token : tokens) {
final Matcher matcher = pattern.matcher(token);
// log level found
if (matcher.matches()) {
logLevel.set(token);
// Create the key value pairs
collector.collect(logLevel, accumulator);
}
}
}
}
}
// The Reducer
public static class Reduce extends MapReduceBase
implements Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(
Text key,
Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> collector,
Reporter reporter)
throws IOException {
int count = 0;
// code to aggregate the occurrence
while (values.hasNext()) {
count += values.next().get();
}
System.out.println(key + "\t" + count);
collector.collect(key, new IntWritable(count));
}
}
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// Code to create a new Job specifying the MapReduce class
final JobConf job = new JobConf(Tutorial1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(Map.class);
// Combiner is commented out – to be used in bonus activity
// job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setJarByClass(Tutorial1.class);
// File Input argument passed as a command line argument
FileInputFormat.setInputPaths(job, new Path(args[0]));
// File Output argument passed as a command line argument
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// statement to execute the job
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception { // Let ToolRunner handle generic command-line options
int res = ToolRunner.run(new Configuration(), new Tutorial1(), args);
System.exit(res);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment