Last active
August 29, 2015 14:00
-
-
Save ricston-git/11254966 to your computer and use it in GitHub Desktop.
The new batch module, polling with watermark and the new database connector for http://ricston.com/blog/mule-batch-polling-watermark-database/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<db:generic-config name="Generic_Database_Configuration" dataSource-ref="dataSource"/> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES"> | |
<logger doc:name="Logger" level="ERROR" | |
message="Record with the following payload has failed. Payload:: #[message.payload]" /> | |
</batch:step> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<batch:commit doc:name="Batch Commit" size="10"> | |
<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" doc:name="Copy_of_Database"> | |
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query> | |
</db:insert> | |
</batch:commit> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<poll doc:name="Poll"> | |
<fixed-frequency-scheduler frequency="1" | |
timeUnit="HOURS" /> | |
<watermark variable="Id" default-expression="#[0]" | |
selector="MAX" selector-expression="#[message.payload['key1']]" /> | |
<db:select config-ref="Generic_Database_Configuration" | |
doc:name="Database"> | |
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query> | |
</db:select> | |
</poll> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<batch:on-complete> | |
<logger message="Number of failed Records: #[payload.failedRecords] " | |
level="INFO" doc:name="Failed Records" /> | |
<logger message="Number of sucessfull Records: #[payload.successfulRecords]" | |
level="INFO" doc:name="Sucessfull Records" /> | |
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]" | |
level="INFO" doc:name="Elapsed Time" /> | |
</batch:on-complete> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<watermark variable="Id" default-expression="#[0]" | |
selector="MAX" selector-expression="#[message.payload['key1']]" /> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<spring:beans> | |
<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource"> | |
<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/> | |
<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/> | |
<spring:property name="username" value="user"/> | |
<spring:property name="password" value="password"/> | |
</spring:bean> | |
</spring:beans> | |
<db:generic-config name="Generic_Database_Configuration" | |
dataSource-ref="dataSource" /> | |
<batch:job name="batch-example-1"> | |
<batch:threading-profile poolExhaustedAction="WAIT" /> | |
<batch:input> | |
<poll doc:name="Poll"> | |
<fixed-frequency-scheduler frequency="1" | |
timeUnit="HOURS" /> | |
<watermark variable="Id" default-expression="#[0]" | |
selector="MAX" selector-expression="#[message.payload['key1']]" /> | |
<db:select config-ref="Generic_Database_Configuration" | |
doc:name="Database"> | |
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query> | |
</db:select> | |
</poll> | |
</batch:input> | |
<batch:process-records> | |
<batch:step name="Batch_Step"> | |
<batch:commit doc:name="Batch Commit" size="10"> | |
<db:insert config-ref="Generic_Database_Configuration" bulkMode="true" > | |
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query> | |
</db:insert> | |
</batch:commit> | |
</batch:step> | |
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES"> | |
<logger doc:name="Logger" level="ERROR" | |
message="Record with the following payload has failed. Payload:: #[message.payload]" /> | |
</batch:step> | |
</batch:process-records> | |
<batch:on-complete> | |
<logger message="Number of failed Records: #[payload.failedRecords] " | |
level="INFO" doc:name="Failed Records" /> | |
<logger message="Number of sucessfull Records: #[payload.successfulRecords]" | |
level="INFO" doc:name="Sucessfull Records" /> | |
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]" | |
level="INFO" doc:name="Elapsed Time" /> | |
</batch:on-complete> | |
</batch:job> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<batch:step name="Batch_Step"> | |
<batch:commit size="10"> | |
<db:insert config-ref="Generic_Database_Configuration" | |
doc:name="Database" bulkMode="true"> | |
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query> | |
</db:insert> | |
</batch:commit> | |
</batch:step> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<db:select config-ref="Generic_Database_Configuration"> | |
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query> | |
</db:select> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<choice> | |
<when expression="#[payload.size() > 0]"> | |
<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" /> | |
</when> | |
<otherwise> | |
<set-variable variableName="myId" value="#[0]" doc:name="set id" /> | |
</otherwise> | |
</choice> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" /> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<spring:beans> | |
<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource"> | |
<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/> | |
<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/> | |
<spring:property name="username" value="user"/> | |
<spring:property name="password" value="password"/> | |
</spring:bean> | |
</spring:beans> | |
<db:generic-config name="Generic_Database_Configuration" | |
dataSource-ref="dataSource" doc:name="Generic Database Configuration" /> | |
<batch:job name="batch-example-two"> | |
<batch:input> | |
<poll> | |
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" /> | |
<watermark variable="Id" default-expression="#[0]" | |
update-expression="#[flowVars['myId']]" /> | |
<db:select config-ref="Generic_Database_Configuration" | |
doc:name="Database"> | |
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ] order by key1 asc]]></db:parameterized-query> | |
</db:select> | |
</poll> | |
<component class="com.ricston.blog.examples.StopScheduler" | |
doc:name="Stop scheduler" /> | |
<choice> | |
<when expression="#[payload.size() > 0]"> | |
<set-variable variableName="myId" value="#[payload[payload.size()-1]['key1']]" doc:name="set id" /> | |
</when> | |
<otherwise> | |
<set-variable variableName="myId" value="#[0]" doc:name="set id" /> | |
</otherwise> | |
</choice> | |
</batch:input> | |
<batch:process-records> | |
<batch:step name="Batch_Step"> | |
<batch:commit size="10"> | |
<db:insert config-ref="Generic_Database_Configuration" | |
doc:name="Database" bulkMode="true"> | |
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query> | |
</db:insert> | |
</batch:commit> | |
</batch:step> | |
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES"> | |
<logger doc:name="Logger" level="ERROR" | |
message="Record with the following payload has failed. Payload:: #[message.payload]" /> | |
</batch:step> | |
</batch:process-records> | |
<batch:on-complete> | |
<logger level="INFO" doc:name="Logger" message="Stopping Inputblock" /> | |
<choice> | |
<when expression="#[payload.getProcessedRecords() > 0]"> | |
<component class="com.ricston.blog.example.ScheduleScheduler" | |
doc:name="Schedule Scheduler" /> | |
<component class="com.ricston.blog.example.StartScheduler" | |
doc:name="Start Scheduler" /> | |
</when> | |
<otherwise> | |
<!-- get scheduler running --> | |
<component class="com.ricston.blog.example.StartScheduler" | |
doc:name="Start Scheduler" /> | |
</otherwise> | |
</choice> | |
<logger message="Number of failed Records: #[payload.failedRecords] " | |
level="INFO" doc:name="Failed Records" /> | |
<logger message="Number of sucessfull Records: #[payload.successfulRecords]" | |
level="INFO" doc:name="Sucessfull Records" /> | |
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]" | |
level="INFO" doc:name="Elapsed Time" /> | |
</batch:on-complete> | |
</batch:job> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<mule xmlns:core="http://www.mulesoft.org/schema/mule/core" | |
xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" | |
xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:db="http://www.mulesoft.org/schema/mule/db" | |
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" | |
xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.5.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd | |
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd | |
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd | |
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd | |
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> | |
<spring:beans> | |
<spring:bean id="dataSource" | |
class="org.springframework.jdbc.datasource.SimpleDriverDataSource"> | |
<spring:property name="driverClass" value="org.postgresql.Driver" /> | |
<spring:property name="url" | |
value="jdbc:postgresql://localhost/mydatabase" /> | |
<spring:property name="username" value="username" /> | |
<spring:property name="password" value="password" /> | |
</spring:bean> | |
</spring:beans> | |
<db:generic-config name="Generic_Database_Configuration" | |
dataSource-ref="dataSource" doc:name="Generic Database Configuration" /> | |
<batch:job name="batch-example-1" scheduling-strategy="ROUND_ROBIN"> | |
<batch:threading-profile poolExhaustedAction="WAIT" /> | |
<batch:input> | |
<poll doc:name="Poll" > | |
<fixed-frequency-scheduler frequency="10" | |
timeUnit="SECONDS" /> | |
<watermark variable="Id" default-expression="#[0]" | |
selector="MAX" selector-expression="#[message.payload['key1']]" /> | |
<processor-chain doc:name="Processor Chain"> | |
<db:select config-ref="Generic_Database_Configuration" | |
doc:name="Database"> | |
<db:parameterized-query><![CDATA[SELECT key1, key2 FROM table1 WHERE key1 BETWEEN #[flowVars['Id'] +1 ] AND #[flowVars['Id'] + 10 ]]]></db:parameterized-query> | |
</db:select> | |
<logger level="ERROR" message="current input phase #[payload]" | |
doc:name="Logger" /> | |
</processor-chain> | |
</poll> | |
</batch:input> | |
<batch:process-records> | |
<batch:step name="sleppThreadForFirstKey"> | |
<scripting:component doc:name="Groovy"> | |
<scripting:script engine="Groovy"><![CDATA[java.util.Map record = (java.util.Map) eventContext.getMessage().getPayload(); | |
String key1 = record.get("key1").toString(); | |
if (key1.equals("1")) { | |
Thread.sleep(20000); | |
} | |
return message.payload; | |
]]></scripting:script> | |
</scripting:component> | |
</batch:step> | |
<batch:step name="Batch_Step"> | |
<batch:commit doc:name="Batch Commit" size="10"> | |
<db:insert config-ref="Generic_Database_Configuration" | |
bulkMode="true" doc:name="Database"> | |
<db:parameterized-query><![CDATA[INSERT into table2(KEY1, KEY2) values(#[payload['key1']], #[payload['key2']])]]></db:parameterized-query> | |
</db:insert> | |
</batch:commit> | |
</batch:step> | |
<batch:step name="BatchFailed" accept-policy="ONLY_FAILURES"> | |
<logger doc:name="Logger" level="ERROR" | |
message="Record with the following payload has failed. Payload:: #[message.payload]" /> | |
</batch:step> | |
</batch:process-records> | |
<batch:on-complete> | |
<logger message="Number of failed Records: #[payload.failedRecords] " | |
level="ERROR" doc:name="Failed Records" /> | |
<logger message="Number of sucessfull Records: #[payload.successfulRecords]" | |
level="ERROR" doc:name="Sucessfull Records" /> | |
<logger message="ElapsedTime #[payload.getElapsedTimeInMillis()]" | |
level="ERROR" doc:name="Elapsed Time" /> | |
</batch:on-complete> | |
</batch:job> | |
</mule> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ERROR 2014-05-17 16:09:28,856 [pool-12-thread-1] org.mule.api.processor.LoggerMessageProcessor: current input phase [{key1=1, key2=1}, {key1=2, key2=2}, {key1=3, key2=3}, {key1=4, key2=4}, {key1=5, key2=5}, {key1=6, key2=6}, {key1=7, key2=7}, {key1=8, key2=7}, {key1=9, key2=7}, {key1=10, key2=7}] | |
INFO 2014-05-17 16:09:29,070 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance dc38e666-ddcc-11e3-a66a-87bfac71d161 for batch job batch-example-1 | |
INFO 2014-05-17 16:09:29,073 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase | |
INFO 2014-05-17 16:09:29,073 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed | |
INFO 2014-05-17 16:09:29,120 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:29,150 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1. 10 records were loaded | |
INFO 2014-05-17 16:09:29,154 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
ERROR 2014-05-17 16:09:36,773 [pool-12-thread-1] org.mule.api.processor.LoggerMessageProcessor: current input phase [{key1=11, key2=7}, {key1=12, key2=7}, {key1=13, key2=7}, {key1=14, key2=7}, {key1=15, key2=7}, {key1=16, key2=7}, {key1=17, key2=7}, {key1=18, key2=7}, {key1=19, key2=7}, {key1=20, key2=7}] | |
INFO 2014-05-17 16:09:36,804 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 for batch job batch-example-1 | |
INFO 2014-05-17 16:09:36,805 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase | |
INFO 2014-05-17 16:09:36,805 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed | |
INFO 2014-05-17 16:09:36,809 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:36,812 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1. 10 records were loaded | |
INFO 2014-05-17 16:09:36,815 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:37,027 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step sleppThreadForFirstKey finished processing all records for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:38,854 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:40,741 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step BatchFailed finished processing all records for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:40,743 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:40,743 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
ERROR 2014-05-17 16:09:40,748 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of failed Records: 0 | |
ERROR 2014-05-17 16:09:40,752 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of sucessfull Records: 10 | |
ERROR 2014-05-17 16:09:40,776 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: ElapsedTime 3931 | |
INFO 2014-05-17 16:09:40,777 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution of onComplete phase for instance e0ead4c9-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
ERROR 2014-05-17 16:09:46,773 [pool-12-thread-1] org.mule.api.processor.LoggerMessageProcessor: current input phase [{key1=21, key2=7}, {key1=22, key2=7}, {key1=23, key2=7}, {key1=24, key2=7}, {key1=25, key2=7}, {key1=26, key2=7}, {key1=27, key2=7}, {key1=28, key2=7}, {key1=29, key2=7}, {key1=30, key2=7}] | |
INFO 2014-05-17 16:09:46,786 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 for batch job batch-example-1 | |
INFO 2014-05-17 16:09:46,786 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase | |
INFO 2014-05-17 16:09:46,786 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed | |
INFO 2014-05-17 16:09:46,790 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:46,792 [pool-12-thread-1] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1. 10 records were loaded | |
INFO 2014-05-17 16:09:46,795 [pool-12-thread-1] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:46,824 [batch-job-batch-example-1-work-manager.03] com.mulesoft.module.batch.DefaultBatchStep: Step sleppThreadForFirstKey finished processing all records for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:48,773 [batch-job-batch-example-1-work-manager.05] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:50,674 [batch-job-batch-example-1-work-manager.01] com.mulesoft.module.batch.DefaultBatchStep: Step sleppThreadForFirstKey finished processing all records for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:50,748 [batch-job-batch-example-1-work-manager.03] com.mulesoft.module.batch.DefaultBatchStep: Step BatchFailed finished processing all records for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:50,750 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:50,750 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
ERROR 2014-05-17 16:09:50,751 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of failed Records: 0 | |
ERROR 2014-05-17 16:09:50,751 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of sucessfull Records: 10 | |
ERROR 2014-05-17 16:09:50,752 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: ElapsedTime 3958 | |
INFO 2014-05-17 16:09:50,752 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution of onComplete phase for instance e6e12b6d-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:51,789 [batch-job-batch-example-1-work-manager.04] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step finished processing all records for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:53,757 [batch-job-batch-example-1-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step BatchFailed finished processing all records for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:53,758 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
INFO 2014-05-17 16:09:53,759 [[testbatch].Batch Dispatcher thread] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance dc38e666-ddcc-11e3-a66a-87bfac71d161 of job batch-example-1 | |
ERROR 2014-05-17 16:09:53,759 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of failed Records: 0 | |
ERROR 2014-05-17 16:09:53,760 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: Number of sucessfull Records: 10 | |
ERROR 2014-05-17 16:09:53,760 [[testbatch].Batch Dispatcher thread] org.mule.api.processor.LoggerMessageProcessor: ElapsedTime 24607 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<poll> | |
<schedulers:cron-scheduler expression="0 0/1 * 1/1 * ? *" /> | |
</poll> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<poll> | |
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" startDelay="0"/> | |
</poll> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<poll> | |
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" /> | |
<watermark variable="Id" default-expression="#[0]" selector="MAX" selector-expression="#[payload['id']]" /> | |
</poll> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<poll> | |
<fixed-frequency-scheduler frequency="1" timeUnit="MINUTES" /> | |
<watermark variable="Id" default-expression="#[0]" update-expression="#[flowVars['myId']]" /> | |
</poll>... | |
<set-variable variableName="myId" value="#[payload[payload.size()-1]['id']]" /> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<poll doc:name="Poll"> | |
<fixed-frequency-scheduler frequency="1" timeUnit="HOURS"/> | |
<db:select config-ref="Generic_Database_Configuration"> | |
<db:parameterized-query><![CDATA[SELECT KEY1, KEY2 FROM table1 ]]></db:parameterized-query> | |
</db:select> | |
</poll> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ricston.blog.example; | |
import java.util.Collection; | |
import org.mule.api.MuleContext; | |
import org.mule.api.MuleEventContext; | |
import org.mule.api.MuleException; | |
import org.mule.api.schedule.Scheduler; | |
import org.mule.modules.schedulers.cron.CronScheduler; | |
import org.mule.transport.polling.schedule.FixedFrequencyScheduler; | |
import org.mule.util.Predicate; | |
public class SchedulerWrapper{ | |
protected Scheduler getScheduler(MuleEventContext eventContext) { | |
MuleContext context = eventContext.getMuleContext(); | |
final String flowName = eventContext.getFlowConstruct().getName(); | |
// Get poll scheduler so as to stop and start and poll the endpoint. | |
Collection<Scheduler> pollSchedulers = context.getRegistry() | |
.lookupScheduler(new Predicate<String>() { | |
@Override | |
public boolean evaluate(String s) { | |
// the scheduler name should be polling:// + flow | |
// name/batchjob name + /.. | |
if (s.startsWith("polling://" + flowName + "/")) { | |
return true; | |
} else { | |
return false; | |
} | |
} | |
}); | |
// There should be only one scheduler | |
if (pollSchedulers.size() == 1) { | |
Scheduler pollScheduler = pollSchedulers.iterator().next(); | |
return pollScheduler; | |
} else { | |
throw new IllegalStateException("Was expecting one scheduler but there were:" + pollSchedulers.size()); | |
} | |
} | |
protected void stopScheduler(Scheduler scheduler) throws MuleException { | |
if (scheduler instanceof FixedFrequencyScheduler) { | |
@SuppressWarnings("rawtypes") | |
FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler; | |
fixedFrequencyScheduler.stop(); | |
} else if (scheduler instanceof CronScheduler) { | |
CronScheduler cronScheduler = (CronScheduler) scheduler; | |
cronScheduler.stop(); | |
} else { | |
throw new IllegalArgumentException( | |
"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " | |
+ scheduler.getClass()); | |
} | |
} | |
protected void startScheduler(Scheduler scheduler) throws MuleException { | |
if (scheduler instanceof FixedFrequencyScheduler) { | |
@SuppressWarnings("rawtypes") | |
FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler; | |
fixedFrequencyScheduler.start(); | |
} else if (scheduler instanceof CronScheduler) { | |
CronScheduler cronScheduler = (CronScheduler) scheduler; | |
cronScheduler.start(); | |
} else { | |
throw new IllegalArgumentException( | |
"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass()); | |
} | |
} | |
protected void scheduleScheduler(Scheduler scheduler) throws Exception{ | |
if (scheduler instanceof FixedFrequencyScheduler) { | |
@SuppressWarnings("rawtypes") | |
FixedFrequencyScheduler fixedFrequencyScheduler = (FixedFrequencyScheduler) scheduler; | |
fixedFrequencyScheduler.schedule(); | |
} else if (scheduler instanceof CronScheduler) { | |
CronScheduler cronScheduler = (CronScheduler) scheduler; | |
cronScheduler.schedule(); | |
} else { | |
throw new IllegalArgumentException( | |
"Expected instance of org.mule.transport.polling.schedule.FixedFrequencyScheduler or org.mule.transport.polling.schedule.CronScheduler, but argument was " + scheduler.getClass()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ricston.blog.example; | |
import org.mule.api.MuleEventContext; | |
import org.mule.api.lifecycle.Callable; | |
import org.mule.api.schedule.Scheduler; | |
public class ScheduleScheduler extends SchedulerWrapper implements Callable { | |
@Override | |
public Object onCall(MuleEventContext eventContext) throws Exception { | |
Scheduler scheduler = getScheduler(eventContext); | |
this.scheduleScheduler(scheduler); | |
return eventContext.getMessage().getPayload(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<spring:beans> | |
<spring:bean id="dataSource" class="org.springframework.jdbc.datasource.SimpleDriverDataSource"> | |
<spring:property name="driverClass" value="org.apache.derby.jdbc.EmbeddedDriver"/> | |
<spring:property name="url" value="jdbc:derby:target/database/message;create=true"/> | |
<spring:property name="username" value="user"/> | |
<spring:property name="password" value="password"/> | |
</spring:bean> | |
</spring:beans> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ricston.blog.example; | |
import org.mule.api.MuleEventContext; | |
import org.mule.api.lifecycle.Callable; | |
import org.mule.api.schedule.Scheduler; | |
public class StartScheduler extends SchedulerWrapper implements Callable { | |
@Override | |
public Object onCall(MuleEventContext eventContext) throws Exception { | |
Scheduler scheduler = this.getScheduler(eventContext); | |
this.startScheduler(scheduler); | |
return eventContext.getMessage().getPayload(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.ricston.blog.example; | |
import org.mule.api.MuleEventContext; | |
import org.mule.api.lifecycle.Callable; | |
import org.mule.api.schedule.Scheduler; | |
public class StopScheduler extends SchedulerWrapper implements Callable { | |
@Override | |
public Object onCall(MuleEventContext eventContext) throws Exception { | |
Scheduler scheduler = this.getScheduler(eventContext); | |
this.stopScheduler(scheduler); | |
return eventContext.getMessage().getPayload(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment