Skip to content

Instantly share code, notes, and snippets.

@airawat
Last active January 1, 2016 13:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save airawat/8151179 to your computer and use it in GitHub Desktop.
Save airawat/8151179 to your computer and use it in GitHub Desktop.
LogParserInCascading
About this gist:
================
This gist is a part of a series of log parsers in Java Mapreduce, Pig, Hive, Python...
This one covers a log parser in Cascading.
It reads syslogs in HDFS -
a) Parses them based on a regex pattern & writes parsed files to HDFS
b) Writes records that dont match pattern to HDFS
c) Writes a report to HDFS that contains the count of distinct processes logged.
Other gists/blogs:
==================
1) Log parsing - Java Mapreduce
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-1-java-using.html
2) Log parsing - Pig
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-3-pig-latin.html
3) Log parsing - Hive
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-2-hive-using.html
4) Log parsing - Python/streaming
http://hadooped.blogspot.com/2013/07/log-parsing-in-hadoop-part-1-python.html
TODOs:
======
The year is not in the syslogs, but is part of the file path (its a directory).
Figure out how to extract the year from the file path, in Cascading, and add as a field.
https://groups.google.com/forum/#!msg/cascading-user/1llgrXOL69Q/BzzPJRbpQ_MJ
Sample log data
---------------
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found: 0.rhel.pool.ntp.org
Data structure
--------------
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
Directory structure of logs
---------------------------
cascadingSamples
data
syslogs
<<Node-Name>>
<<Year>>
<<Month>>
messages
Specifically...
LogParser/data/syslogs/cdh-dev01/2013/04/messages
LogParser/data/syslogs/cdh-dev01/2013/05/messages
LogParser/data/syslogs/cdh-dn01/2013/05/messages
LogParser/data/syslogs/cdh-dn02/2013/04/messages
LogParser/data/syslogs/cdh-dn02/2013/05/messages
LogParser/data/syslogs/cdh-dn03/2013/04/messages
LogParser/data/syslogs/cdh-dn03/2013/05/messages
LogParser/data/syslogs/cdh-jt01/2013/04/messages
LogParser/data/syslogs/cdh-jt01/2013/05/messages
LogParser/data/syslogs/cdh-nn01/2013/05/messages
LogParser/data/syslogs/cdh-vms/2013/05/messages
package logparser;
import java.util.Properties;
import java.util.Collections;
import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowConnector;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.tap.hadoop.GlobHfs;
import cascading.tap.MultiSourceTap;
import cascading.tap.hadoop.Hfs;
import cascading.operation.regex.RegexParser;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.property.AppProps;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.operation.aggregator.Count;
import cascading.operation.expression.ExpressionFunction;
public class LogParser {
public static void main(String[] args) {
// {{
// INSTANTIATE/INITIALIZE
// Set the current job jar
Properties properties = new Properties();
AppProps.setApplicationJarClass( properties, LogParser.class );
HadoopFlowConnector flowConnector = new HadoopFlowConnector(properties);
// Arguments
String inputPath = args[ 0 ];
String outputPath = args[ 1 ];
String errorPath = args[ 2 ];
String reportPath = args[ 3 ];
// Scheme for sinks
TextLine sinkTextLineScheme = new TextLine();
// Define what the input file looks like, "offset" is bytes from beginning
TextLine sourceTextLineScheme = new TextLine( new Fields( "offset", "line" ) );
// The inputPath is a file glob, and a so, GlobHfs is used below
GlobHfs sourceFilesGlob = new GlobHfs( sourceTextLineScheme, inputPath );
// Create SOURCE tap to read a resource from the HDFS glob
Tap sourceSyslogTap = new MultiSourceTap(sourceFilesGlob);
// Create a SINK tap to write parsed logs to HDFS
sinkTextLineScheme.setNumSinkParts(2);
Tap sinkParsedLogTap = new Hfs( sinkTextLineScheme, outputPath, SinkMode.REPLACE);
// Create a SINK tap to write reports to HDFS
sinkTextLineScheme.setNumSinkParts(1);
Tap sinkReportTap = new Hfs(sinkTextLineScheme, reportPath, SinkMode.REPLACE );
// Create a TRAP tap to write records that failed parsing
sinkTextLineScheme.setNumSinkParts(1);
Tap sinkTrapTap = new Hfs( sinkTextLineScheme, errorPath , SinkMode.REPLACE );
// }}
// {{
// EXTRACT/PARSE
// Declare the field names
Fields sysLogFields = new Fields( "month", "day", "time", "node", "process", "message" );
// Define the regex pattern to parse the log file with
String sysLogRegex = "(\\w+)\\s+(\\d+)\\s+(\\d+:\\d+:\\d+)\\s+(\\w+\\W*\\w*)\\s+(.*?\\:)\\s+(.*$)";
// Declare the groups from the above regex we want to keep. Each regex group will be given
// a field name from 'sysLogFields', above, respectively
int[] keepParsedGroups = {1, 2, 3, 4, 5, 6};
// Create the parser
RegexParser parser = new RegexParser( sysLogFields, sysLogRegex, keepParsedGroups );
// Import & parse pipe
// Create the import pipe element, with the name 'import', and with the input argument named "line"
// Replace the incoming tuple with the parser results
// "line" -> parser -> "ts"
Pipe importAndParsePipe = new Each( "import", new Fields( "line" ), parser, Fields.RESULTS );
// }}
// {{
// TRANSFORM
// Transform the process field - remove process ID if found, for better reporting on logs
// Also, convert to lowercase
// E.g. Change "ntpd[1302]" to "ntpd"
String expression = "process.substring(0, (process.indexOf('[') == -1 ? process.length()-1 : process.indexOf('[') )).toLowerCase()";
Fields fieldProcess = new Fields( "process" );
ExpressionFunction expFunc =
new ExpressionFunction( fieldProcess, expression, String.class );
// Pipe for transformed data
Pipe scrubbedDataPipe = new Each( importAndParsePipe, fieldProcess, expFunc, Fields.REPLACE );
// }}
// {{
// REPORT/ANALYZE
// Capture counts by process, as a report, sort by count, desc
// ------------------------------------------------------------
// process count()
// E.g. sshd 4
Pipe reportPipe = new Pipe("reportByProcess", scrubbedDataPipe);
Fields keyFields = new Fields("process");
Fields groupByFields = new Fields( "process");
Fields countField = new Fields( "countOfEvents" );
Fields sortByFields = new Fields( "process");
reportPipe = new GroupBy(reportPipe, groupByFields);
reportPipe = new Every(reportPipe, keyFields,
new Count(countField), Fields.ALL);
reportPipe = new GroupBy(reportPipe,
keyFields,
countField,
false); //true=descending order
//End of reports
//}}
// {{
// EXECUTE
// Connect the taps, pipes, etc., into a flow & execute
FlowDef flowDef = FlowDef.flowDef()
.setName( "Log parser" )
.addSource( importAndParsePipe, sourceSyslogTap )
.addTailSink( scrubbedDataPipe, sinkParsedLogTap )
.addTailSink(reportPipe,sinkReportTap)
.addTrap( importAndParsePipe, sinkTrapTap );
Flow flow = flowConnector.connect(flowDef);
flow.complete();
// }}
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'
archivesBaseName = 'logparser-cascading'
repositories {
mavenLocal()
mavenCentral()
mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}
ext.cascadingVersion = '2.5.1'
dependencies {
compile( group: 'cascading', name: 'cascading-core', version: cascadingVersion )
compile( group: 'cascading', name: 'cascading-hadoop', version: cascadingVersion )
}
jar {
description = "Assembles a Hadoop ready jar file"
doFirst {
into( 'lib' ) {
from configurations.compile
} }
manifest {
attributes( "Main-Class": "logparser/LogParser" )
}
}
Gradle
=================
$ gradle clean jar
Should generate a jar with dependencies managed.
Load data to HDFS
==================
$ hadoop fs -mkdir cascadingSamples
$ cd ~
$ hadoop fs -put cascadingSamples/data cascadingSamples
$ hadoop fs -put cascadingSamples/jars cascadingSamples
Run program
============
hadoop jar cascadingSamples/jars/logparser-cascading.jar "cascadingSamples/data/syslogs/*/*/*/" "cascadingSamples/Output-LogParser" "cascadingSamples/Output-LogParser/traps" "cascadingSamples/Output-LogParser/reports"
Output files
=============
$ hadoop fs -ls -R cascadingSamples |grep 'part*' | awk '{print $8}'
cascadingSamples/Output-LogParser/part-00000
cascadingSamples/Output-LogParser/part-00001
cascadingSamples/Output-LogParser/part-00002
cascadingSamples/Output-LogParser/part-00003
cascadingSamples/Output-LogParser/part-00004
cascadingSamples/Output-LogParser/part-00005
cascadingSamples/Output-LogParser/part-00006
cascadingSamples/Output-LogParser/part-00007
cascadingSamples/Output-LogParser/part-00008
cascadingSamples/Output-LogParser/part-00009
cascadingSamples/Output-LogParser/part-00010
cascadingSamples/Output-LogParser/reports/part-00000
cascadingSamples/Output-LogParser/traps/part-m-00001-00006
cascadingSamples/Output-LogParser/traps/part-m-00002-00006
Parsed log
===========
$ hadoop fs -cat cascadingSamples/Output-LogParser/part-00003 | less
May 3 11:51:50 cdh-dn02 init tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:52:26 cdh-dn02 kernel nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:52:51 cdh-dn02 kernel hrtimer: interrupt took 6222750 ns
May 3 11:52:53 cdh-dn02 ntpd_initres host name not found: 0.rhel.pool.ntp.org
Report
===========
$ hadoop fs -cat cascadingSamples/Output-LogParser/reports/part-00000 | less
console-kit-daemon 7
gnome-session 11
init 166
kernel 810
login 2
networkmanager 7
nm-dispatcher.action 4
ntpd_initres 4133
polkit-agent-helper-1 8
pulseaudio 18
spice-vdagent 15
sshd 6
sudo 8
udevd 6
Records that failed parsing
============================
$ hadoop fs -cat cascadingSamples/Output-LogParser/traps/part*
May 7 00:40:53 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:42:13 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:43:38 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:45:01 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:47:18 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
May 7 00:47:41 cdh-vms /etc/sysconfig/network-scripts/i Device eth0 does not seem to be present, delaying initialization.
Record count of input [syslogs]
===============================
$ hadoop fs -cat cascadingSamples/data/syslogs/*/*/*/messages | wc -l
5207
Record count of output [parsed logs + records that failed parsing]
===================================================================
$ echo $((`hadoop fs -cat cascadingSamples/Output-LogParser/part* | wc -l`+`hadoop fs -cat cascadingSamples/Output-LogParser/traps/part* | wc -l`))
5207
Code and data download
=======================
Github:
https://github.com/airawat/LogParser/tree/master/cascadingSamples
Directories relevant to this post:
===================================
$ tree -if --noreport LogParser
LogParser
LogParser/cascadingSamples
LogParser/cascadingSamples/jars
LogParser/cascadingSamples/jars/logparser-cascading.jar
LogParser/cascadingSamples/src
LogParser/cascadingSamples/src/main
LogParser/cascadingSamples/src/main/java
LogParser/cascadingSamples/src/main/java/logparser
LogParser/cascadingSamples/src/main/java/logparser/LogParser.java
LogParser/data
LogParser/data/syslogs
LogParser/data/syslogs/cdh-dev01
LogParser/data/syslogs/cdh-dev01/2013
LogParser/data/syslogs/cdh-dev01/2013/04
LogParser/data/syslogs/cdh-dev01/2013/04/messages
LogParser/data/syslogs/cdh-dev01/2013/05
LogParser/data/syslogs/cdh-dev01/2013/05/messages
LogParser/data/syslogs/cdh-dn01
LogParser/data/syslogs/cdh-dn01/2013
LogParser/data/syslogs/cdh-dn01/2013/05
LogParser/data/syslogs/cdh-dn01/2013/05/messages
LogParser/data/syslogs/cdh-dn02
LogParser/data/syslogs/cdh-dn02/2013
LogParser/data/syslogs/cdh-dn02/2013/04
LogParser/data/syslogs/cdh-dn02/2013/04/messages
LogParser/data/syslogs/cdh-dn02/2013/05
LogParser/data/syslogs/cdh-dn02/2013/05/messages
LogParser/data/syslogs/cdh-dn03
LogParser/data/syslogs/cdh-dn03/2013
LogParser/data/syslogs/cdh-dn03/2013/04
LogParser/data/syslogs/cdh-dn03/2013/04/messages
LogParser/data/syslogs/cdh-dn03/2013/05
LogParser/data/syslogs/cdh-dn03/2013/05/messages
LogParser/data/syslogs/cdh-jt01
LogParser/data/syslogs/cdh-jt01/2013
LogParser/data/syslogs/cdh-jt01/2013/04
LogParser/data/syslogs/cdh-jt01/2013/04/messages
LogParser/data/syslogs/cdh-jt01/2013/05
LogParser/data/syslogs/cdh-jt01/2013/05/messages
LogParser/data/syslogs/cdh-nn01
LogParser/data/syslogs/cdh-nn01/2013
LogParser/data/syslogs/cdh-nn01/2013/05
LogParser/data/syslogs/cdh-nn01/2013/05/messages
LogParser/data/syslogs/cdh-vms
LogParser/data/syslogs/cdh-vms/2013
LogParser/data/syslogs/cdh-vms/2013/05
LogParser/data/syslogs/cdh-vms/2013/05/messages
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment