Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Sample of an Oozie workflow with streaming action - parses Syslog generated log files using python -regex
This gist includes oozie workflow components (streaming map reduce action) to execute
python mapper and reducer scripts to parse Syslog generated log files using regex;
Usecase: Count the number of occurances of processes that got logged, by month, and process.
Pictorial overview of workflow:
Sample data and structure: 01-SampleDataAndStructure
Data and script download: 02-DataAndScriptDownload
Data load commands: 03-HdfsLoadCommands
Python mapper script: 04A-PythonMapperScript
Python reducer script: 04B-PythonReducerScript
Python script test commands: 05-PythonScriptTest
Oozie job properties: 06-JobProperties
Oozie workflow: 07-OozieWorkflowXML
Oozie job exection command: 08-OozieCommands
Oozie job output 09-Output
Oozie web GUI: 10-OozieWebGUIScreenshots
Sample data
May 3 11:52:54 cdh-dn03 init: tty (/dev/tty6) main process (1208) killed by TERM signal
May 3 11:53:31 cdh-dn03 kernel: registered taskstats version 1
May 3 11:53:31 cdh-dn03 kernel: sr0: scsi3-mmc drive: 32x/32x xa/form2 tray
May 3 11:53:31 cdh-dn03 kernel: piix4_smbus 0000:00:07.0: SMBus base address uninitialized - upgrade BIOS or use force_addr=0xaddr
May 3 11:53:31 cdh-dn03 kernel: nf_conntrack version 0.5.0 (7972 buckets, 31888 max)
May 3 11:53:57 cdh-dn03 kernel: hrtimer: interrupt took 11250457 ns
May 3 11:53:59 cdh-dn03 ntpd_initres[1705]: host name not found:
Month = May
Day = 3
Time = 11:52:54
Node = cdh-dn03
Process = init:
Log msg = tty (/dev/tty6) main process (1208) killed by TERM signal
Data download
Email me t if you have access issues.
Directory structure applicable for this blog:
Load downloaded files to HDFS
--Create project directory
$ hadoop fs -mkdir oozieProject
--Deploy data and workflow
$ hadoop fs -put oozieProject/* oozieProject/
Directory structure on HDFS
$ hadoop fs -ls -R oozieProject/workflowStreamingMRActionPy | awk '{print $8}'
/* **************************************** */
/* Python mapper script: */
/* **************************************** */
#!/usr/bin/env /usr/bin/python
import sys
import re
data_pattern = r"(\w+)\s+(\d+)\s+(\d+:\d+:\d+)\s+(\w+\W*\w*)\s+(.*?\:)\s+(.*$)"
regex_obj = re.compile(data_pattern, re.VERBOSE)
# filepath = os.environ["Data/*/*/*/*"]
# filename = os.path.split(filepath)[-1]
# Get all lines from stdin
for strLineRead in sys.stdin:
# Remove leading and trailing whitespace
strLineRead = strLineRead.strip()
# Split the line into fields
parsed_log = ""
parsed_log =
if parsed_log:
# Output key-value pair
print '%s\t%s' % ( + "-" +, "1")
#print "month_name: ",
#print "day: ",
#print "time: ",
#print "node: ",
#print "event: ",
#print "message: ",
/* ****************************************** */
/* Python reducer script: */
/* ****************************************** */
#!/usr/bin/env /usr/bin/python
import sys
eventCountArray = {}
# Input is from STDIN
for line in sys.stdin:
# Remove leading and trailing whitespace
line = line.strip()
# Parse the input from the mapper
event, count = line.split('\t', 1)
# Cast count to int
count = int(count)
except ValueError:
# Compute event count
eventCountArray[event] = eventCountArray[event]+count
eventCountArray[event] = count
# Write the results (unsorted) to stdout
for event in eventCountArray.keys():
print '%s\t%s'% ( event, eventCountArray[event] )
# Testing the python scripts outside of oozie
#Test the mapper from the directory where the data is located:
cat oozieProject/data/*/*/*/*/* | python oozieProject/workflowStreamingMRActionPy/
#Test mapper and reducer
cat oozieProject/data/*/*/*/*/* | python oozieProject/workflowStreamingMRActionPy/ | sort | python oozieProject/workflowStreamingMRActionPy/ | sort
#Delete prior copy of scripts
hadoop fs -rm -R oozieProject/workflowStreamingMRActionPy/
#Load application, if not already done..
hadoop fs -put ~/oozieProject/workflowStreamingMRActionPy/ oozieProject/
#Run on cluster (update paths as needed)
hadoop jar /opt/cloudera/parcels/CDH-4.2.0-1.cdh4.2.0.p0.10/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.2.0.jar -jobconf mapred.reduce.tasks=1 -file oozieProject/workflowStreamingMRActionPy/ -mapper oozieProject/workflowStreamingMRActionPy/ -file oozieProject/workflowStreamingMRActionPy/ -reducer oozieProject/workflowStreamingMRActionPy/ -input oozieProject/data/*/*/*/*/* -output oozieProject/workflowStreamingMRActionPy/output-streaming-manualRun
#View output
$ hadoop fs -ls -R oozieProject/workflowStreamingMRActionPy/output-streaming-manualRun/part* | awk '{print $8}' | xargs hadoop fs -cat
May-spice-vdagent[2020]: 1
May-ntpd_initres[997]: 3
May-nm-dispatcher.action: 4
May-NetworkManager[1232]: 1
May-init: 166
# -------------------------------------------------
# This is the job properties file -
# -------------------------------------------------
# Replace name node and job tracker information with that specific to your cluster
<!--Oozie workflow file: workflow.xml -->
<workflow-app name="WorkflowStreamingMRAction-Python" xmlns="uri:oozie:workflow:0.1">
<start to="streamingaAction"/>
<action name="streamingaAction">
<delete path="${outputDir}"/>
<ok to="end"/>
<error to="killJobAction"/>
<kill name="killJobAction">
<message>"Killed job due to error: ${wf:errorMessage(wf:lastErrorNode())}"</message>
<end name="end" />
08. Oozie commands
Note: Replace oozie server and port, with your cluster-specific.
1) Submit job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowStreamingMRActionPy/ -submit
job: 0000017-130712212133144-oozie-oozi-W
2) Run job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -start 0000017-130712212133144-oozie-oozi-W
3) Check the status:
$ oozie job -oozie http://cdh-dev01:11000/oozie -info 0000017-130712212133144-oozie-oozi-W
4) Suspend workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -suspend 0000017-130712212133144-oozie-oozi-W
5) Resume workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -resume 0000017-130712212133144-oozie-oozi-W
6) Re-run workflow:
$ oozie job -oozie http://cdh-dev01:11000/oozie -config oozieProject/workflowSqoopAction/ -rerun 0000017-130712212133144-oozie-oozi-W
7) Should you need to kill the job:
$ oozie job -oozie http://cdh-dev01:11000/oozie -kill 0000017-130712212133144-oozie-oozi-W
8) View server logs:
$ oozie job -oozie http://cdh-dev01:11000/oozie -logs 0000017-130712212133144-oozie-oozi-W
Logs are available at:
/var/log/oozie on the Oozie server.
$ hadoop fs -ls -R oozieProject/workflowStreamingMRActionPy/output/part-* | awk '{print $8}' | xargs hadoop fs -cat
May-gnome-session[2010]: 3
May-ntpd_initres[1872]: 24
May-ntpd_initres[1705]: 792
May-spice-vdagent[2114]: 1
May-pulseaudio[2135]: 1
May-pulseaudio[2076]: 1
May-spice-vdagent[1974]: 1
May-ntpd_initres[1720]: 792
May-ntpd_initres[1084]: 6
May-spice-vdagent[2109]: 1
May-pulseaudio[2257]: 1
May-NetworkManager[1292]: 1
May-pulseaudio[2032]: 1
May-kernel: 810
May-ntpd_initres[1592]: 798
May-NetworkManager[1342]: 1
May-polkit-agent-helper-1[2036]: 8
May-spice-vdagent[1955]: 1
May-console-kit-daemon[1779]: 4
May-spice-vdagent[2016]: 1
May-ntpd_initres[1026]: 9
May-pulseaudio[2039]: 1
Oozie web GUI screenshots
Available at:

This comment has been minimized.

Copy link
Owner Author

airawat commented Jul 15, 2013

To do: The processes have session IDs attached to them in square brackets that need to b stripped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.