Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active September 18, 2016 09:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save airawat/5915374 to your computer and use it in GitHub Desktop.
Save airawat/5915374 to your computer and use it in GitHub Desktop.
00-JavaMapperReducerUsingRegex
This gist includes a mapper, reducer and driver in java that can parse log files using
regex; The code for combiner is the same as reducer;
Usecase: Count the number of occurances of processes that got logged, inception to date.
Includes:
---------
Sample data and scripts for download:01-ScriptAndDataDownload
Sample data and structure: 02-SampleDataAndStructure
Mapper: 03-LogEventCountMapper.java
Reducer: 04-LogEventCountReducer.java
Driver: 05-LogEventCount.java
Commands: 06-Commands
Sample output: 07-Output
Code files and data download
-----------------------------
The code files, and sample data are available for download at:
https://groups.google.com/forum/?hl=en&fromgroups#!topic/hadooped/0KSGUAREb84
The following is the directory structure with directories listed in bold.
LogParserSample
logs
airawat-syslog
2013
04
messages
2013
05
messages
JavaProgram
LogAnalysisEventLevel/src/Airawat/Oozie/Samples
LogEventCount.java
LogEventCountMapper.java
LogEventCountReducer.java
Application
lib
LogEventCount.jar
Sample data
------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Structure
----------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
Commands to load data and run mapreduce program
-------------------------------------------------
a) Load the data
$ hadoop fs -mkdir LogParserSample
$ hadoop fs -mkdir LogParserSample/logs
$ hadoop fs -put LogParserSample/logs/* LogParserSample/logs/
$ hadoop fs -ls -R LogParserSample/ | awk {'print $8'}
LogParserSample/logs
LogParserSample/logs/airawat-syslog
LogParserSample/logs/airawat-syslog/2013
LogParserSample/logs/airawat-syslog/2013/04
LogParserSample/logs/airawat-syslog/2013/04/messages
LogParserSample/logs/airawat-syslog/2013/05
LogParserSample/logs/airawat-syslog/2013/05/messages
b) Run the map reduce program
$ hadoop jar LogParserSample/Application/lib/LogEventCount.jar Airawat.Oozie.Samples.LogEventCount LogParserSample/logs/*/*/*/ LogParserSample/myCLIOutput
c) Results:
$ hadoop fs -ls -R LogParserSample | awk {'print $8'}
LogParserSample/logs
LogParserSample/logs/airawat-syslog
LogParserSample/logs/airawat-syslog/2013
LogParserSample/logs/airawat-syslog/2013/04
LogParserSample/logs/airawat-syslog/2013/04/messages
LogParserSample/logs/airawat-syslog/2013/05
LogParserSample/logs/airawat-syslog/2013/05/messages
LogParserSample/myCLIOutput
LogParserSample/myCLIOutput/_SUCCESS
LogParserSample/myCLIOutput/_logs
LogParserSample/myCLIOutput/_logs/history
LogParserSample/myCLIOutput/_logs/history/cdh-jt01_1371140867475_job_201306131127_0177_conf.xml
LogParserSample/myCLIOutput/_logs/history/job_201306131127_0177_1371573190526_airawat_Syslog+Event+Rollup
LogParserSample/myCLIOutput/part-r-00000
d) View output
$ hadoop fs -cat LogParserSample/myCLIOutput/part-r-00000
//Mapper: LogEventCountMapper.java
package Airawat.Oozie.Samples;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
* The following is the code for the mapper class:
*/
public class LogEventCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
/*
* The map method runs once for each line of text in the input file.
* The method receives a key of type LongWritable, a value of type
* Text, and a Context object.
*/
String strLogEntryPattern = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
public static final int NUM_FIELDS = 6;
Text strEvent = new Text("");
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String strLogEntryLine = value.toString();
Pattern objPtrn = Pattern.compile(strLogEntryPattern);
Matcher objPatternMatcher = objPtrn.matcher(strLogEntryLine);
if (!objPatternMatcher.matches() || NUM_FIELDS != objPatternMatcher.groupCount()) {
System.err.println("Bad log entry (or problem with RE?):");
System.err.println(strLogEntryLine);
return;
}
/*
System.out.println("Month_Name: " + objPatternMatcher.group(1));
System.out.println("Day: " + objPatternMatcher.group(2));
System.out.println("Time: " + objPatternMatcher.group(3));
System.out.println("Node: " + objPatternMatcher.group(4));
System.out.println("Process: " + objPatternMatcher.group(5));
System.out.println("LogMessage: " + objPatternMatcher.group(6));
*/
strEvent.set(objPatternMatcher.group(5));
/*
* Call the write method on the Context object to emit a key
* and a value from the map method.
*/
context.write(strEvent, new IntWritable(1));
}
}
//Reducer: LogEventCountReducer.java
package Airawat.Oozie.Samples;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*
* The following is the code for the reducer class:
*/
public class LogEventCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int intEventCount = 0;
/*
* For each value in the set of values passed by the mapper:
*/
for (IntWritable value : values) {
/*
* Add the value to the SysLog event counter for this key.
*/
intEventCount += value.get();
}
/*
* Call the write method on the Context object to emit a key
* and a value from the reduce method.
*/
context.write(key, new IntWritable(intEventCount));
}
}
//Driver: LogEventCount.java
package Airawat.Oozie.Samples;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
/*
* The following is the code for the driver class:
*/
public class LogEventCount {
public static void main(String[] args) throws Exception {
/*
* The expected command-line arguments are the paths containing
* input and output data. Terminate the job if the number of
* command-line arguments is not exactly 2.
*/
if (args.length != 2) {
System.out.printf(
"Usage: SysLogEventCount <input dir> <output dir>\n");
System.exit(-1);
}
/*
* Instantiate a Job object for your job's configuration.
*/
Job job = new Job();
/*
* Specify the jar file that contains your driver, mapper, and reducer.
* Hadoop will transfer this jar file to nodes in your cluster running
* mapper and reducer tasks.
*/
job.setJarByClass(LogEventCount.class);
/*
* Specify an easily-decipherable name for the job.
* This job name will appear in reports and logs.
*/
job.setJobName("Syslog Event Rollup");
/*
* Specify the paths to the input and output data based on the
* command-line arguments.
*/
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
* Specify the mapper and reducer classes.
*/
job.setMapperClass(LogEventCountMapper.class);
job.setReducerClass(LogEventCountReducer.class);
/*
* For the logs count application, the mapper's output keys and
* values have the same data types as the reducer's output keys
* and values: Text and IntWritable.
*
* When they are not the same data types, you must call the
* setMapOutputKeyClass and setMapOutputValueClass
* methods.
*/
/*
* Specify the job's output key and value classes.
*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setNumReduceTasks(1);
/*
* Start the MapReduce job and wait for it to finish.
* If it finishes successfully, return 0. If not, return 1.
*/
boolean success = job.waitForCompletion(true);
System.exit(success ? 0 : 1);
}
}
Output of mapreduce program
----------------------------
init: 23
kernel: 58
ntpd_initres[1705]: 792
sudo: 2
udevd[361]: 1
@airawat
Copy link
Author

airawat commented Jul 15, 2013

The mapper at the gist at the link below does a little more than the mapper detailed above.

  1. It strips off the session id;
  2. It gets the directory path passed to the mapper, extracts the year, and includes in the key emitted.

https://gist.github.com/airawat/5961360

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment