Skip to content

Instantly share code, notes, and snippets.

@dsyer
Created September 19, 2011 22:10
Show Gist options
  • Save dsyer/1227755 to your computer and use it in GitHub Desktop.
Save dsyer/1227755 to your computer and use it in GitHub Desktop.
Simple sketch of Spring Batch jobs copying Oozie example

The Oozie example from an article on InfoQ, but sketched as a Spring Batch state machine instead.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<job id="processDirsWF" xmlns="http://www.springframework.org/schema/batch">
<listeners>
<listener>
<beans:ref bean="failureListener" />
</listener>
</listeners>
<step id="forkSubWorkflows" next="forkSubWorkflows">
<partition partitioner="dirs2ProcessPartitioner">
<step>
<job ref="processDir" />
</step>
</partition>
</step>
</job>
<bean id="dirs2ProcessPartitioner" class="com.navteq.oozie.GenerateLookupDirsPartitioner">
<property name="inputBase" value="hdfs://sachicn001:8020/user/data/probedev/files/" />
<property name="outputBase" value="hdfs://sachicn001:8020/user/gtitievs/probe-output" />
</bean>
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="processDir" xmlns="http://www.springframework.org/schema/batch">
<listeners>
<listener>
<beans:ref bean="getDirInfoListener" />
</listener>
<listener>
<beans:ref bean="failureListener" />
</listener>
</listeners>
<decision id="makeIngestDecision" decider="makeIngestDecider">
<next on="COMPLETE" to="ingest" />
<next on="INCOMPLETE" to="sendEmail" />
<end on="*" />
</decision>
<step id="sendEmail">
<tasklet ref="sendEmailTasklet" />
</step>
<step id="ingest" next="archive">
<!-- This could be a Spring Hadoop M-R step -->
<tasklet ref="ingestTasklet" />
</step>
<step id="archive">
<tasklet ref="archiveTasklet" />
</step>
</job>
<bean id="makeIngestDecider" class="com.navteq.batch.MakeIngestDecider">
<property name="inputBase" value="hdfs://sachicn001:8020/user/data/probedev/files/" />
<property name="completeExpression">
<value>
jobExecutionContext['getDirInfo')['dir.num-files'] gt 23 or jobExecutionContext['getDirInfo']['dir.age'] gt 6
</value>
</property>
<property name="incompleteExpression">
<value>
jobExecutionContext['getDirInfo')['dir.num-files'] lt 0 or jobExecutionContext['getDirInfo']['dir.age'] lt 1 and jobExecutionContext['getDirInfo')['dir.num-files'] lt 24
</value>
</property>
</bean>
<bean id="sendEmailTasklet" class="com.navteq.batch.SendEmailTasklet">
...
</bean>
<bean id="ingestTasklet" class="com.navteq.batch.IngestTasklet">
...
</bean>
<bean id="archiveTasklet" class="com.navteq.batch.ArchiveTasklet">
...
</bean>
</beans>
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDirsWF'
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="uri:oozie:workflow:0.1 https://github.com/yahoo/oozie/blob/master/client/src/main/resources/distcp-action-0.1.xsd">
<start to='getDirs2Process' />
<!-- STEP ONE -->
<action name='getDirs2Process'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist,
otherwise returns # of files in dir dir.age: returns -1 if dir doesn't exist,
otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GenerateLookupDirs</main-class>
<capture-output />
</java>
<ok to="forkSubWorkflows" />
<error to="fail" />
</action>
<fork name="forkSubWorkflows">
<path start="processDir0"/>
<path start="processDir1"/>
<path start="processDir2"/>
<path start="processDir3"/>
<path start="processDir4"/>
<path start="processDir5"/>
<path start="processDir6"/>
<path start="processDir7"/>
</fork>
<action name="processDir0">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>activeDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/test-activeDir</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir0']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
...
<action name="processDir7">
<sub-workflow>
<app-path>hdfs://sachicn001:8020/user/gtitievs/workflows/ingest</app-path>
<configuration>
<property>
<name>inputDir</name>
<value>hdfs://sachicn001:8020/user/data/probedev/files/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>outputDir</name>
<value>hdfs://sachicn001:8020/user/gtitievs/probe-output/${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
<property>
<name>dirName</name>
<value>${wf:actionData('getDirs2Process')['dir7']}</value>
</property>
</configuration>
</sub-workflow>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="end"/>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
<workflow-app xmlns='uri:oozie:workflow:0.1' name='processDir'
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="uri:oozie:workflow:0.1 https://github.com/yahoo/oozie/blob/master/client/src/main/resources/distcp-action-0.1.xsd">
<start to='getDirInfo' />
<!-- STEP ONE -->
<action name='getDirInfo'>
<!--writes 2 properties: dir.num-files: returns -1 if dir doesn't exist, otherwise returns # of files in dir dir.age: returns
-1 if dir doesn't exist, otherwise returns age of dir in days -->
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.GetDirInfo</main-class>
<arg>${inputDir}</arg>
<capture-output />
</java>
<ok to="makeIngestDecision" />
<error to="fail" />
</action>
<!-- STEP TWO -->
<decision name="makeIngestDecision">
<switch>
<!-- empty or doesn't exist -->
<case to="end">
${wf:actionData('getDirInfo')['dir.num-files'] lt 0 ||
(wf:actionData('getDirInfo')['dir.age'] lt 1 and
wf:actionData('getDirInfo')['dir.num-files'] lt 24)}
</case>
<!-- # of files >= 24 -->
<case to="ingest">
${wf:actionData('getDirInfo')['dir.num-files'] gt 23 ||
wf:actionData('getDirInfo')['dir.age'] gt 6}
</case>
<default to="sendEmail" />
</switch>
</decision>
<!--EMAIL -->
<action name="sendEmail">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<main-class>com.navteq.oozie.StandaloneMailer</main-class>
<arg>probedata2@navteq.com</arg>
<arg>gregory.titievsky@navteq.com</arg>
<arg>${inputDir}</arg>
<arg>${wf:actionData('getDirInfo')['dir.num-files']}</arg>
<arg>${wf:actionData('getDirInfo')['dir.age']}</arg>
</java>
<ok to="end" />
<error to="fail" />
</action>
<!--INGESTION -->
<action name="ingest">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${outputDir}" />
</prepare>
<configuration>
<property>
<name>mapred.reduce.tasks</name>
<value>300</value>
</property>
</configuration>
<main-class>com.navteq.probedata.drivers.ProbeIngest</main-class>
<arg>-conf</arg>
<arg>action.xml</arg>
<arg>${inputDir}</arg>
<arg>${outputDir}</arg>
</java>
<ok to=" archive-data" />
<error to="ingest-fail" />
</action>
<!—Archive Data -->
<action name="archive-data">
<fs>
<move source='${inputDir}' target='/probe/backup/${dirName}' />
<delete path='${inputDir}' />
</fs>
<ok to="end" />
<error to="ingest-fail" />
</action>
<kill name="ingest-fail">
<message>Ingestion failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<kill name="fail">
<message>Java failed, error
message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />
</workflow-app>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment