The Oozie example from an article on InfoQ, but sketched as a Spring Batch state machine instead.
Created
September 19, 2011 22:10
-
-
Save dsyer/1227755 to your computer and use it in GitHub Desktop.
Simple sketch of Spring Batch jobs copying Oozie example
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" | |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd | |
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd"> | |
<job id="processDirsWF" xmlns="http://www.springframework.org/schema/batch"> | |
<listeners> | |
<listener> | |
<beans:ref bean="failureListener" /> | |
</listener> | |
</listeners> | |
<step id="forkSubWorkflows" next="forkSubWorkflows"> | |
<partition partitioner="dirs2ProcessPartitioner"> | |
<step> | |
<job ref="processDir" /> | |
</step> | |
</partition> | |
</step> | |
</job> | |
<bean id="dirs2ProcessPartitioner" class="com.navteq.oozie.GenerateLookupDirsPartitioner"> | |
<property name="inputBase" value="hdfs://sachicn001:8020/user/data/probedev/files/" /> | |
<property name="outputBase" value="hdfs://sachicn001:8020/user/gtitievs/probe-output" /> | |
</bean> | |
</beans> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans" | |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd | |
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd"> | |
<job id="processDir" xmlns="http://www.springframework.org/schema/batch"> | |
<listeners> | |
<listener> | |
<beans:ref bean="getDirInfoListener" /> | |
</listener> | |
<listener> | |
<beans:ref bean="failureListener" /> | |
</listener> | |
</listeners> | |
<decision id="makeIngestDecision" decider="makeIngestDecider"> | |
<next on="COMPLETE" to="ingest" /> | |
<next on="INCOMPLETE" to="sendEmail" /> | |
<end on="*" /> | |
</decision> | |
<step id="sendEmail"> | |
<tasklet ref="sendEmailTasklet" /> | |
</step> | |
<step id="ingest" next="archive"> | |
<!-- This could be a Spring Hadoop M-R step --> | |
<tasklet ref="ingestTasklet" /> | |
</step> | |
<step id="archive"> | |
<tasklet ref="archiveTasklet" /> | |
</step> | |
</job> | |
<bean id="makeIngestDecider" class="com.navteq.batch.MakeIngestDecider"> | |
<property name="inputBase" value="hdfs://sachicn001:8020/user/data/probedev/files/" /> | |
<property name="completeExpression"> | |
<value> | |
jobExecutionContext['getDirInfo')['dir.num-files'] gt 23 or jobExecutionContext['getDirInfo']['dir.age'] gt 6 | |
</value> | |
</property> | |
<property name="incompleteExpression"> | |
<value> | |
jobExecutionContext['getDirInfo')['dir.num-files'] lt 0 or jobExecutionContext['getDirInfo']['dir.age'] lt 1 and jobExecutionContext['getDirInfo')['dir.num-files'] lt 24 | |
</value> | |
</property> | |
</bean> | |
<bean id="sendEmailTasklet" class="com.navteq.batch.SendEmailTasklet"> | |
... | |
</bean> | |
<bean id="ingestTasklet" class="com.navteq.batch.IngestTasklet"> | |
... | |
</bean> | |
<bean id="archiveTasklet" class="com.navteq.batch.ArchiveTasklet"> | |
... | |
</bean> | |
</beans> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF' | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="uri:oozie:workflow:0.1 https://github.com/yahoo/oozie/blob/master/client/src/main/resources/distcp-action-0.1.xsd"> | |
<start to='getDirs2Process' /> | |
<!-- STEP ONE --> | |
<action name='getDirs2Process'> | |
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist, | |
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist, | |
otherwise returns age of dir in days --> | |
<java> | |
<job-tracker>${jobTracker}</job-tracker> | |
<name-node>${nameNode}</name-node> | |
<main-class>com.navteq.oozie.GenerateLookupDirs</main-class> | |
<capture-output /> | |
</java> | |
<ok to="forkSubWorkflows" /> | |
<error to="fail" /> | |
</action> | |
<fork name="forkSubWorkflows"> | |
<path start="processDir0"/> | |
<path start="processDir1"/> | |
<path start="processDir2"/> | |
<path start="processDir3"/> | |
<path start="processDir4"/> | |
<path start="processDir5"/> | |
<path start="processDir6"/> | |
<path start="processDir7"/> | |
</fork> | |
<action name="processDir0"> | |
<sub-workflow> | |
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path> | |
<configuration> | |
<property> | |
<name>inputDir</name> | |
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value> | |
</property> | |
<property> | |
<name>outputDir</name> | |
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value> | |
</property> | |
<property> | |
<name>jobTracker</name> | |
<value>${jobTracker}</value> | |
</property> | |
<property> | |
<name>nameNode</name> | |
<value>${nameNode}</value> | |
</property> | |
<property> | |
<name>activeDir</name> | |
<value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value> | |
</property> | |
<property> | |
<name>dirName</name> | |
<value>${wf:actionData('getDirs2Process')['dir0']}</value> | |
</property> | |
</configuration> | |
</sub-workflow> | |
<ok to="joining"/> | |
<error to="fail"/> | |
</action> | |
... | |
<action name="processDir7"> | |
<sub-workflow> | |
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path> | |
<configuration> | |
<property> | |
<name>inputDir</name> | |
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value> | |
</property> | |
<property> | |
<name>outputDir</name> | |
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value> | |
</property> | |
<property> | |
<name>dirName</name> | |
<value>${wf:actionData('getDirs2Process')['dir7']}</value> | |
</property> | |
</configuration> | |
</sub-workflow> | |
<ok to="joining"/> | |
<error to="fail"/> | |
</action> | |
<join name="joining" to="end"/> | |
<kill name="fail"> | |
<message>Java failed, error | |
message[${wf:errorMessage(wf:lastErrorNode())}]</message> | |
</kill> | |
<end name='end' /> | |
</workflow-app> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir' | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="uri:oozie:workflow:0.1 https://github.com/yahoo/oozie/blob/master/client/src/main/resources/distcp-action-0.1.xsd"> | |
<start to='getDirInfo' /> | |
<!-- STEP ONE --> | |
<action name='getDirInfo'> | |
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist, otherwise returns # of files in dir dir.age: returns | |
-1 if dir doesn't exist, otherwise returns age of dir in days --> | |
<java> | |
<job-tracker>${jobTracker}</job-tracker> | |
<name-node>${nameNode}</name-node> | |
<main-class>com.navteq.oozie.GetDirInfo</main-class> | |
<arg>${inputDir}</arg> | |
<capture-output /> | |
</java> | |
<ok to="makeIngestDecision" /> | |
<error to="fail" /> | |
</action> | |
<!-- STEP TWO --> | |
<decision name="makeIngestDecision"> | |
<switch> | |
<!-- empty or doesn't exist --> | |
<case to="end"> | |
${wf:actionData('getDirInfo')['dir.num-files'] lt 0 || | |
(wf:actionData('getDirInfo')['dir.age'] lt 1 and | |
wf:actionData('getDirInfo')['dir.num-files'] lt 24)} | |
</case> | |
<!-- # of files >= 24 --> | |
<case to="ingest"> | |
${wf:actionData('getDirInfo')['dir.num-files'] gt 23 || | |
wf:actionData('getDirInfo')['dir.age'] gt 6} | |
</case> | |
<default to="sendEmail" /> | |
</switch> | |
</decision> | |
<!--EMAIL --> | |
<action name="sendEmail"> | |
<java> | |
<job-tracker>${jobTracker}</job-tracker> | |
<name-node>${nameNode}</name-node> | |
<main-class>com.navteq.oozie.StandaloneMailer</main-class> | |
<arg>probedata2@navteq.com</arg> | |
<arg>gregory.titievsky@navteq.com</arg> | |
<arg>${inputDir}</arg> | |
<arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg> | |
<arg>${wf:actionData('getDirInfo')['dir.age']}</arg> | |
</java> | |
<ok to="end" /> | |
<error to="fail" /> | |
</action> | |
<!--INGESTION --> | |
<action name="ingest"> | |
<java> | |
<job-tracker>${jobTracker}</job-tracker> | |
<name-node>${nameNode}</name-node> | |
<prepare> | |
<delete path="${outputDir}" /> | |
</prepare> | |
<configuration> | |
<property> | |
<name>mapred.reduce.tasks</name> | |
<value>300</value> | |
</property> | |
</configuration> | |
<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class> | |
<arg>-conf</arg> | |
<arg>action.xml</arg> | |
<arg>${inputDir}</arg> | |
<arg>${outputDir}</arg> | |
</java> | |
<ok to=" archive-data" /> | |
<error to="ingest-fail" /> | |
</action> | |
<!—Archive Data --> | |
<action name="archive-data"> | |
<fs> | |
<move source='${inputDir}' target='/probe/backup/${dirName}' /> | |
<delete path='${inputDir}' /> | |
</fs> | |
<ok to="end" /> | |
<error to="ingest-fail" /> | |
</action> | |
<kill name="ingest-fail"> | |
<message>Ingestion failed, error | |
message[${wf:errorMessage(wf:lastErrorNode())}]</message> | |
</kill> | |
<kill name="fail"> | |
<message>Java failed, error | |
message[${wf:errorMessage(wf:lastErrorNode())}]</message> | |
</kill> | |
<end name='end' /> | |
</workflow-app> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment