Skip to content

Instantly share code, notes, and snippets.

@ricston-git
Last active August 29, 2015 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ricston-git/11254966 to your computer and use it in GitHub Desktop.
Save ricston-git/11254966 to your computer and use it in GitHub Desktop.
The new batch module, polling with watermark and the new database connector for http://ricston.com/blog/mule-batch-polling-watermark-database/
<db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSource"/>
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
<logger doc:name="Logger" level="ERROR"
message="Record with the following payload has failed. Payload:: #[message.payload]" />
</batch:step>
<batch:commit doc:name="Batch Commit" size="10">
<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" doc:name="Copy_of_Database">
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
</db:insert>
</batch:commit>
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="1"
timeUnit="HOURS" />
<watermark variable="Id" default-expression="#[0]"
selector="MAX" selector-expression="#[message.payload['key1']]" />
<db:select config-ref="Generic_Database_Configuration"
doc:name="Database">
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
</db:select>
</poll>
<batch:on-complete>
<logger message="Number of failed Records: #[payload.failedRecords] "
level="INFO" doc:name="Failed Records" />
<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
level="INFO" doc:name="Sucessfull Records" />
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
level="INFO" doc:name="Elapsed Time" />
</batch:on-complete>
<watermark variable="Id" default-expression="#[0]"
selector="MAX" selector-expression="#[message.payload['key1']]" />
<spring:beans>
<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
<spring:property name="username" value="user"/>
<spring:property name="password" value="password"/>
</spring:bean>
</spring:beans>
<db:generic-config name="Generic_Database_Configuration"
dataSource-ref="dataSource" />
<batch:job name="batch-example-1">
<batch:threading-profile poolExhaustedAction="WAIT" />
<batch:input>
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="1"
timeUnit="HOURS" />
<watermark variable="Id" default-expression="#[0]"
selector="MAX" selector-expression="#[message.payload['key1']]" />
<db:select config-ref="Generic_Database_Configuration"
doc:name="Database">
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
</db:select>
</poll>
</batch:input>
<batch:process-records>
<batch:step name="Batch_Step">
<batch:commit doc:name="Batch Commit" size="10">
<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" >
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
</db:insert>
</batch:commit>
</batch:step>
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
<logger doc:name="Logger" level="ERROR"
message="Record with the following payload has failed. Payload:: #[message.payload]" />
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger message="Number of failed Records: #[payload.failedRecords] "
level="INFO" doc:name="Failed Records" />
<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
level="INFO" doc:name="Sucessfull Records" />
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
level="INFO" doc:name="Elapsed Time" />
</batch:on-complete>
</batch:job>
<batch:step name="Batch_Step">
<batch:commit size="10">
<db:insert config-ref="Generic_Database_Configuration"
doc:name="Database" bulkMode="true">
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
</db:insert>
</batch:commit>
</batch:step>
<db:select config-ref="Generic_Database_Configuration">
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query>
</db:select>
<choice>
<when expression="#[payload.size() > 0]">
<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" />
</when>
<otherwise>
<set-variable variableName="myId" value="#[0]" doc:name="set id" />
</otherwise>
</choice>
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" />
<spring:beans>
<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
<spring:property name="username" value="user"/>
<spring:property name="password" value="password"/>
</spring:bean>
</spring:beans>
<db:generic-config name="Generic_Database_Configuration"
dataSource-ref="dataSource" doc:name="Generic Database Configuration" />
<batch:job name="batch-example-two">
<batch:input>
<poll>
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
<watermark variable="Id" default-expression="#[0]"
update-expression="#[flowVars['myId']]" />
<db:select config-ref="Generic_Database_Configuration"
doc:name="Database">
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query>
</db:select>
</poll>
<component class="com.ricston.blog.examples.StopScheduler"
doc:name="Stop scheduler" />
<choice>
<when expression="#[payload.size() > 0]">
<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" doc:name="set id" />
</when>
<otherwise>
<set-variable variableName="myId" value="#[0]" doc:name="set id" />
</otherwise>
</choice>
</batch:input>
<batch:process-records>
<batch:step name="Batch_Step">
<batch:commit size="10">
<db:insert config-ref="Generic_Database_Configuration"
doc:name="Database" bulkMode="true">
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
</db:insert>
</batch:commit>
</batch:step>
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
<logger doc:name="Logger" level="ERROR"
message="Record with the following payload has failed. Payload:: #[message.payload]" />
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger level="INFO" doc:name="Logger" message="Stopping Inputblock" />
<choice>
<when expression="#[payload.getProcessedRecords() &gt; 0]">
<component class="com.ricston.blog.example.ScheduleScheduler"
doc:name="Schedule Scheduler" />
<component class="com.ricston.blog.example.StartScheduler"
doc:name="Start Scheduler" />
</when>
<otherwise>
<!-- get scheduler running -->
<component class="com.ricston.blog.example.StartScheduler"
doc:name="Start Scheduler" />
</otherwise>
</choice>
<logger message="Number of failed Records: #[payload.failedRecords] "
level="INFO" doc:name="Failed Records" />
<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
level="INFO" doc:name="Sucessfull Records" />
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
level="INFO" doc:name="Elapsed Time" />
</batch:on-complete>
</batch:job>
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:core="http://www.mulesoft.org/schema/mule/core"
xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting"
xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:db="http://www.mulesoft.org/schema/mule/db"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.5.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">
<spring:beans>
<spring:bean id="dataSource"
class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<spring:property name="driverClass" value="org.postgresql.Driver" />
<spring:property name="url"
value="jdbc:postgresql://localhost/mydatabase" />
<spring:property name="username" value="username" />
<spring:property name="password" value="password" />
</spring:bean>
</spring:beans>
<db:generic-config name="Generic_Database_Configuration"
dataSource-ref="dataSource" doc:name="Generic Database Configuration" />
<batch:job name="batch-example-1" scheduling-strategy="ROUND_ROBIN">
<batch:threading-profile poolExhaustedAction="WAIT" />
<batch:input>
<poll doc:name="Poll" >
<fixed-frequency-scheduler frequency="10"
timeUnit="SECONDS" />
<watermark variable="Id" default-expression="#[0]"
selector="MAX" selector-expression="#[message.payload['key1']]" />
<processor-chain doc:name="Processor Chain">
<db:select config-ref="Generic_Database_Configuration"
doc:name="Database">
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query>
</db:select>
<logger level="ERROR" message="current input phase #[payload]"
doc:name="Logger" />
</processor-chain>
</poll>
</batch:input>
<batch:process-records>
<batch:step name="sleppThreadForFirstKey">
<scripting:component doc:name="Groovy">
<scripting:script engine="Groovy"><![CDATA[java.util.Map record = (java.util.Map) eventContext.getMessage().getPayload();
String key1 = record.get("key1").toString();
if (key1.equals("1")) {
Thread.sleep(20000);
}
return message.payload;
]]></scripting:script>
</scripting:component>
</batch:step>
<batch:step name="Batch_Step">
<batch:commit doc:name="Batch Commit" size="10">
<db:insert config-ref="Generic_Database_Configuration"
bulkMode="true" doc:name="Database">
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query>
</db:insert>
</batch:commit>
</batch:step>
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES">
<logger doc:name="Logger" level="ERROR"
message="Record with the following payload has failed. Payload:: #[message.payload]" />
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger message="Number of failed Records: #[payload.failedRecords] "
level="ERROR" doc:name="Failed Records" />
<logger message="Number of sucessfull Records: #[payload.successfulRecords]"
level="ERROR" doc:name="Sucessfull Records" />
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]"
level="ERROR" doc:name="Elapsed Time" />
</batch:on-complete>
</batch:job>
</mule>
ERROR 2014-05-17 16:09:28,856 [pool-12-thread-1] org.mule.api.processor.LoggerMessageProcessor: current input phase [{key1=1, key2=1}, {key1=2, key2=2}, {key1=3, key2=3}, {key1=4, key2=4}, {key1=5, key2=5}, {key1=6, key2=6}, {key1=7, key2=7}, {key1=8, key2=7}, {key1=9, key2=7}, {key1=10, key2=7}]
INFO 2014-05-17 16:09:29,070 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance dc38e666-ddcc-11e3-a66a-87bfac71d161 for batch job batch-example-1
INFO 2014-05-17 16:09:29,073 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase
INFO 2014-05-17 16:09:29,073 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed
INFO 2014-05-17 16:09:29,120 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:29,150 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1. 10 records were loaded
INFO 2014-05-17 16:09:29,154 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
ERROR 2014-05-17 16:09:36,773 [pool-12-thread-1] org.mule.api.processor.LoggerMessageProcessor: current input phase [{key1=11, key2=7}, {key1=12, key2=7}, {key1=13, key2=7}, {key1=14, key2=7}, {key1=15, key2=7}, {key1=16, key2=7}, {key1=17, key2=7}, {key1=18, key2=7}, {key1=19, key2=7}, {key1=20, key2=7}]
INFO 2014-05-17 16:09:36,804 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 for batch job batch-example-1
INFO 2014-05-17 16:09:36,805 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase
INFO 2014-05-17 16:09:36,805 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed
INFO 2014-05-17 16:09:36,809 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:36,812 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1. 10 records were loaded
INFO 2014-05-17 16:09:36,815 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:37,027 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step sleppThreadForFirstKey finished processing all records for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:38,854 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:40,741 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step BatchFailed finished processing all records for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:40,743 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:40,743 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
ERROR 2014-05-17 16:09:40,748 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of failed Records: 0
ERROR 2014-05-17 16:09:40,752 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of sucessfull Records: 10
ERROR 2014-05-17 16:09:40,776 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: ElapsedTime 3931
INFO 2014-05-17 16:09:40,777 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution of onComplete phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
ERROR 2014-05-17 16:09:46,773 [pool-12-thread-1] org.mule.api.processor.LoggerMessageProcessor: current input phase [{key1=21, key2=7}, {key1=22, key2=7}, {key1=23, key2=7}, {key1=24, key2=7}, {key1=25, key2=7}, {key1=26, key2=7}, {key1=27, key2=7}, {key1=28, key2=7}, {key1=29, key2=7}, {key1=30, key2=7}]
INFO 2014-05-17 16:09:46,786 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 for batch job batch-example-1
INFO 2014-05-17 16:09:46,786 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase
INFO 2014-05-17 16:09:46,786 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed
INFO 2014-05-17 16:09:46,790 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:46,792 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1. 10 records were loaded
INFO 2014-05-17 16:09:46,795 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:46,824 [batch-job-batch-example-1-work-manager.03] com.mulesoft.module.batch.DefaultBatchStep: Step sleppThreadForFirstKey finished processing all records for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:48,773 [batch-job-batch-example-1-work-manager.05] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:50,674 [batch-job-batch-example-1-work-manager.01] com.mulesoft.module.batch.DefaultBatchStep: Step sleppThreadForFirstKey finished processing all records for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:50,748 [batch-job-batch-example-1-work-manager.03] com.mulesoft.module.batch.DefaultBatchStep: Step BatchFailed finished processing all records for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:50,750 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:50,750 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
ERROR 2014-05-17 16:09:50,751 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of failed Records: 0
ERROR 2014-05-17 16:09:50,751 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of sucessfull Records: 10
ERROR 2014-05-17 16:09:50,752 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: ElapsedTime 3958
INFO 2014-05-17 16:09:50,752 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution of onComplete phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:51,789 [batch-job-batch-example-1-work-manager.04] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:53,757 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step BatchFailed finished processing all records for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:53,758 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
INFO 2014-05-17 16:09:53,759 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1
ERROR 2014-05-17 16:09:53,759 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of failed Records: 0
ERROR 2014-05-17 16:09:53,760 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of sucessfull Records: 10
ERROR 2014-05-17 16:09:53,760 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: ElapsedTime 24607
<poll>
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" />
</poll>
<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" startDelay="0"/>
</poll>
<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" />
<watermark variable="Id" default-expression="#[0]" selector="MAX" selector-expression="#[payload['id']]" />
</poll>
<poll>
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" />
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" />
</poll>...
<set-variable variableName="myId" value="#[payload[payload.size()-1]['id']]" />
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="1" timeUnit="HOURS"/>
<db:select config-ref="Generic_Database_Configuration">
<db:parameterized-query><![CDATA[SELECT KEY1, KEY2 FROM table1 ]]></db:parameterized-query>
</db:select>
</poll>
package com.ricston.blog.example;
import java.util.Collection;
import org.mule.api.MuleContext;
import org.mule.api.MuleEventContext;
import org.mule.api.MuleException;
import org.mule.api.schedule.Scheduler;
import org.mule.modules.schedulers.cron.CronScheduler;
import org.mule.transport.polling.schedule.FixedFrequencyScheduler;
import org.mule.util.Predicate;
public class SchedulerWrapper{
protected Scheduler getScheduler(MuleEventContext eventContext) {
MuleContext context = eventContext.getMuleContext();
final String flowName = eventContext.getFlowConstruct().getName();
// Get poll scheduler so as to stop and start and poll the endpoint.
Collection<Scheduler> pollSchedulers = context.getRegistry()
.lookupScheduler(new Predicate<String>() {
@Override
public boolean evaluate(String s) {
// the scheduler name should be polling:// + flow
// name/batchjob name + /..
if (s.startsWith("polling://" + flowName + "/")) {
return true;
} else {
return false;
}
}
});
// There should be only one scheduler
if (pollSchedulers.size() == 1) {
Scheduler pollScheduler = pollSchedulers.iterator().next();
return pollScheduler;
} else {
throw new IllegalStateException("Was expecting one scheduler but there were:" + pollSchedulers.size());
}
}
protected void stopScheduler(Scheduler scheduler) throws MuleException {
if (scheduler instanceof FixedFrequencyScheduler) {
@SuppressWarnings("rawtypes")
FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
fixedFrequencyScheduler.stop();
} else if (scheduler instanceof CronScheduler) {
CronScheduler cronScheduler = (CronScheduler) scheduler;
cronScheduler.stop();
} else {
throw new IllegalArgumentException(
"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was "
+ scheduler.getClass());
}
}
protected void startScheduler(Scheduler scheduler) throws MuleException {
if (scheduler instanceof FixedFrequencyScheduler) {
@SuppressWarnings("rawtypes")
FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
fixedFrequencyScheduler.start();
} else if (scheduler instanceof CronScheduler) {
CronScheduler cronScheduler = (CronScheduler) scheduler;
cronScheduler.start();
} else {
throw new IllegalArgumentException(
"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass());
}
}
protected void scheduleScheduler(Scheduler scheduler) throws Exception{
if (scheduler instanceof FixedFrequencyScheduler) {
@SuppressWarnings("rawtypes")
FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler;
fixedFrequencyScheduler.schedule();
} else if (scheduler instanceof CronScheduler) {
CronScheduler cronScheduler = (CronScheduler) scheduler;
cronScheduler.schedule();
} else {
throw new IllegalArgumentException(
"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass());
}
}
}
package com.ricston.blog.example;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;
public class ScheduleScheduler extends SchedulerWrapper implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
Scheduler scheduler = getScheduler(eventContext);
this.scheduleScheduler(scheduler);
return eventContext.getMessage().getPayload();
}
}
<spring:beans>
<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource">
<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/>
<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/>
<spring:property name="username" value="user"/>
<spring:property name="password" value="password"/>
</spring:bean>
</spring:beans>
package com.ricston.blog.example;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;
public class StartScheduler extends SchedulerWrapper implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
Scheduler scheduler = this.getScheduler(eventContext);
this.startScheduler(scheduler);
return eventContext.getMessage().getPayload();
}
}
package com.ricston.blog.example;
import org.mule.api.MuleEventContext;
import org.mule.api.lifecycle.Callable;
import org.mule.api.schedule.Scheduler;
public class StopScheduler extends SchedulerWrapper implements Callable {
@Override
public Object onCall(MuleEventContext eventContext) throws Exception {
Scheduler scheduler = this.getScheduler(eventContext);
this.stopScheduler(scheduler);
return eventContext.getMessage().getPayload();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment