Skip to content

Instantly share code, notes, and snippets.

@dsyer
Created January 22, 2011 12:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsyer/791097 to your computer and use it in GitHub Desktop.
Save dsyer/791097 to your computer and use it in GitHub Desktop.
BATCH-1651 test code
target
bin
.settings
.classpath
.project
derby*
embedded*
testcase.zip
lib
<?xml version="1.0" encoding="UTF-8"?>
<beansProjectDescription>
<version>1</version>
<pluginVersion><![CDATA[2.5.0.201010221000-RELEASE]]></pluginVersion>
<configSuffixes>
<configSuffix><![CDATA[xml]]></configSuffix>
</configSuffixes>
<enableImports><![CDATA[true]]></enableImports>
<configs>
<config>src/main/resources/applicationContext.xml</config>
<config>src/main/resources/jobs/applicationMainJob.xml</config>
<config>src/main/resources/jobs/loadEntryFileSteps.xml</config>
<config>src/main/resources/jobs/writeOutputFilesSteps.xml</config>
<config>src/test/resources/com/company/springbatch/testcase/JobExecutionTests-context.xml</config>
</configs>
<configSets>
</configSets>
</beansProjectDescription>

Test code for BATCH-1651

package com.company.springbatch.testcase.constants;
public class ApplicationConstants {
public static final String DELIMITER_FIELD = "|";
public static final String NAMES_DELIMITED_LINE_TOKENIZER = "fileline";
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd">
<!-- Load the module properties -->
<bean id="propertyPlaceholderConfigurer"
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:/system.properties</value>
<value>classpath:/queries.properties</value>
</list>
</property>
<property name="ignoreUnresolvablePlaceholders" value="false" />
</bean>
<!-- DataSource configuration -->
<bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="org.apache.derby.jdbc.EmbeddedDriver" />
<property name="url" value="${database.pool.url}" />
<property name="username" value="APP" />
<property name="password" value="APP" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<property name="testWhileIdle" value="false" />
<property name="timeBetweenEvictionRunsMillis"
value="${database.pool.timeBetweenEvictionRunsMillis}" />
<property name="initialSize" value="${database.pool.initialSize}" />
<property name="maxActive" value="${database.pool.maxActive}" />
<property name="maxIdle" value="${database.pool.maxIdle}" />
<property name="minIdle" value="${database.pool.minIdle}" />
<property name="maxWait" value="1800000" />
<property name="poolPreparedStatements" value="${database.pool.poolPreparedStatements}" />
<property name="maxOpenPreparedStatements" value="${database.pool.maxOpenPreparedStatements}" />
<property name="defaultAutoCommit" value="false" />
</bean>
<!-- Transaction manager of the application -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- Define the properties to the job initialization -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="databaseType" value="DERBY" />
<property name="transactionManager" ref="transactionManager" />
</bean>
<!-- Job factory -->
<bean id="jobExplorer"
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="tablePrefix" value="TESTCASE_" />
</bean>
<!-- Job initialization -->
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
<property name="taskExecutor">
<bean class="org.springframework.core.task.SyncTaskExecutor" />
</property>
</bean>
<!-- Execution identification control -->
<bean id="runIdIncrementer"
class="com.company.springbatch.testcase.support.RunIdIncrementer" />
<bean id="applicationDAORef"
class="com.company.springbatch.testcase.dao.impl.ApplicationDAOImpl">
<property name="registersCountQuery" value="${application.select.registers.count}" />
<property name="dataSource" ref="dataSource" />
</bean>
</beans>
package com.company.springbatch.testcase.dao;
public interface ApplicationDAO {
/**
* Recovery the quantity of registers to be processed
*
* @param jobInstanceId
* Id of the instance that is running
* @return Quantity of registers to be processed
* @author diego.moreno
*/
public int recoveryRegistersQtd(Integer jobInstanceId);
}
package com.company.springbatch.testcase.dao.impl;
import java.sql.Types;
import org.apache.log4j.Logger;
import org.springframework.jdbc.core.support.JdbcDaoSupport;
import com.company.springbatch.testcase.dao.ApplicationDAO;
public class ApplicationDAOImpl extends JdbcDaoSupport implements
ApplicationDAO {
private static Logger log = Logger.getLogger(ApplicationDAOImpl.class);
private String registersCountQuery;
/**
* @return the registersCountQuery
*/
public String getRegistersCountQuery() {
return registersCountQuery;
}
/**
* @param registersCountQuery
* the registersCountQuery to set
*/
public void setRegistersCountQuery(String registersCountQuery) {
this.registersCountQuery = registersCountQuery;
}
public int recoveryRegistersQtd(Integer jobInstanceId) {
log.debug("Recovering the registers quantity...");
return this.getJdbcTemplate().queryForInt(registersCountQuery,
new Object[] { jobInstanceId }, new int[] { Types.INTEGER });
}
}
package com.company.springbatch.testcase.helper;
import java.text.DecimalFormat;
import java.text.NumberFormat;
public class ApplicationHelper {
public static String formatFileSeq(int num) {
NumberFormat formatter = new DecimalFormat("0000");
return formatter.format(num);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd">
<import resource="classpath:/applicationContext.xml" />
<import resource="classpath:/jobs/loadEntryFileSteps.xml" />
<import resource="classpath:/jobs/writeOutputFilesSteps.xml" />
<job id="applicationMainJob" incrementer="runIdIncrementer"
xmlns="http://www.springframework.org/schema/batch">
<step id="loadStep1" parent="loadStep" next="startProcessStep" />
<step id="startProcessStep" next="writeStep">
<tasklet ref="processPartitioner" transaction-manager="transactionManager" />
</step>
<step id="writeStep" >
<partition step="partitionedReadStep" partitioner="partitioner">
<handler grid-size="2" task-executor="taskExecutor" />
</partition>
</step>
</job>
</beans>
DROP TABLE TEST_CASE_CONTROL;
CREATE TABLE TEST_CASE_CONTROL (
REGISTER_ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1 ,INCREMENT BY 1),
JOB_INSTANCE_ID INTEGER NOT NULL,
FILELINE VARCHAR(1000) NOT NULL,
LOAD_DATE TIMESTAMP NOT NULL,
PROCESS_DATE TIMESTAMP,
PROCESS_STATUS CHAR(1) DEFAULT 'N' NOT NULL,
OUTPUT_FILENAME VARCHAR(1000),
CONSTRAINT PK_TEST_CASE PRIMARY KEY (REGISTER_ID)
);
package com.company.springbatch.testcase.support;
import java.util.HashMap;
import java.util.Map;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import com.company.springbatch.testcase.helper.ApplicationHelper;
public class ControlPartitioner implements Partitioner {
private int registersQtd;
private int maxThreadNumber;
private String fileName;
private static final String START_KEY_NAME = "start_control_id";
private static final String END_KEY_NAME = "end_control_id";
private static final String FILE_NAME = "file_name";
private static final String PARTITION_KEY = "partition";
/**
* @return the registersQtd
*/
public int getRegistersQtd() {
return registersQtd;
}
/**
* @param registersQtd
* the registersQtd to set
*/
public void setRegistersQtd(int registersQtd) {
this.registersQtd = registersQtd;
}
/**
* @return the maxThreadNumber
*/
public int getMaxThreadNumber() {
return maxThreadNumber;
}
/**
* @param maxThreadNumber
* the maxThreadNumber to set
*/
public void setMaxThreadNumber(int maxThreadNumber) {
this.maxThreadNumber = maxThreadNumber;
}
/**
* @return the fileName
*/
public String getFileName() {
return fileName;
}
/**
* @param fileName
* the fileName to set
*/
public void setFileName(String fileName) {
this.fileName = fileName;
}
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>(
gridSize);
if (registersQtd <= maxThreadNumber) {
maxThreadNumber = registersQtd - 1;
}
if (maxThreadNumber <= 0) {
maxThreadNumber = 1;
}
int slice = (int) registersQtd / maxThreadNumber;
int contadorRegistrosInicio = 0;
int contadorRegistrosFim = slice;
for (int i = 0; i < maxThreadNumber; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt(START_KEY_NAME, contadorRegistrosInicio);
if (i + 1 == maxThreadNumber) {
context.putInt(END_KEY_NAME, registersQtd);
} else {
context.putInt(END_KEY_NAME, contadorRegistrosFim);
}
context.putString(FILE_NAME,
this.getFileName() + ApplicationHelper.formatFileSeq(i + 1));
contadorRegistrosInicio = contadorRegistrosFim;
contadorRegistrosFim = contadorRegistrosFim + slice;
map.put(PARTITION_KEY + i, context);
}
return map;
}
}
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package test.jdbc.datasource;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import javax.sql.DataSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
public class DataSourceInitializer implements InitializingBean, DisposableBean {
private Resource[] initScripts;
private Resource destroyScript;
private DataSource dataSource;
private boolean initialize = false;
private Log logger = LogFactory.getLog(getClass());
private boolean initialized = false;
public void setInitialize(boolean initialize) {
this.initialize = initialize;
}
public void destroy() throws Exception {
if (!initialized) {
return;
}
try {
if (destroyScript!=null) {
doExecuteScript(destroyScript);
initialized = false;
}
}
catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.warn("Could not execute destroy script [" + destroyScript + "]", e);
}
else {
logger.warn("Could not execute destroy script [" + destroyScript + "]");
}
}
}
public void afterPropertiesSet() throws Exception {
Assert.notNull(dataSource);
logger.info("Initializing with scripts: "+Arrays.asList(initScripts));
if (!initialized && initialize) {
try {
doExecuteScript(destroyScript);
}
catch (Exception e) {
logger.debug("Could not execute destroy script [" + destroyScript + "]", e);
}
if (initScripts != null) {
for (int i = 0; i < initScripts.length; i++) {
Resource initScript = initScripts[i];
logger.info("Executing init script: "+initScript);
doExecuteScript(initScript);
}
}
initialized = true;
}
}
private void doExecuteScript(final Resource scriptResource) {
if (scriptResource == null || !scriptResource.exists())
return;
TransactionTemplate transactionTemplate = new TransactionTemplate(new DataSourceTransactionManager(dataSource));
transactionTemplate.execute(new TransactionCallback() {
@SuppressWarnings("unchecked")
public Object doInTransaction(TransactionStatus status) {
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
String[] scripts;
try {
scripts = StringUtils.delimitedListToStringArray(stripComments(IOUtils.readLines(scriptResource
.getInputStream())), ";");
}
catch (IOException e) {
throw new BeanInitializationException("Cannot load script from [" + scriptResource + "]", e);
}
for (int i = 0; i < scripts.length; i++) {
String script = scripts[i].trim();
if (StringUtils.hasText(script)) {
try {
jdbcTemplate.execute(scripts[i]);
} catch (DataAccessException e) {
if (!script.toUpperCase().startsWith("DROP")) {
throw e;
}
}
}
}
return null;
}
});
}
private String stripComments(List<String> list) {
StringBuffer buffer = new StringBuffer();
for (String line : list) {
if (!line.startsWith("//") && !line.startsWith("--")) {
buffer.append(line + "\n");
}
}
return buffer.toString();
}
public Class<DataSource> getObjectType() {
return DataSource.class;
}
public void setInitScripts(Resource[] initScripts) {
this.initScripts = initScripts;
}
public void setDestroyScript(Resource destroyScript) {
this.destroyScript = destroyScript;
}
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
}
package com.company.springbatch.testcase.listener;
import org.apache.log4j.Logger;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import com.company.springbatch.testcase.to.EntryTO;
public class EntryFileListener implements SkipListener<EntryTO, EntryTO>,
StepExecutionListener {
private Logger log = Logger.getLogger(EntryFileListener.class);
public ExitStatus afterStep(StepExecution arg0) {
log.info("End of loading of the input file");
return null;
}
public void beforeStep(StepExecution arg0) {
log.info("Start of the execution");
log.info("Start of loading of the input file");
}
public void onSkipInProcess(EntryTO entryTO, Throwable arg1) {
log.error("Error processing the register [" + entryTO.getFileline()
+ "]");
}
public void onSkipInRead(Throwable arg0) {
log.error("Error reading the input file");
}
public void onSkipInWrite(EntryTO entryTO, Throwable arg1) {
log.error("Error writing the register [" + entryTO.getFileline()
+ "] in the table");
}
}
package com.company.springbatch.testcase.processor;
import org.springframework.batch.item.ItemProcessor;
import com.company.springbatch.testcase.to.EntryTO;
public class EntryFileProcessor implements ItemProcessor<EntryTO, EntryTO> {
private Integer jobInstanceId;
/**
* @return the jobInstanceId
*/
public Integer getJobInstanceId() {
return jobInstanceId;
}
/**
* @param jobInstanceId
* the jobInstanceId to set
*/
public void setJobInstanceId(Integer jobInstanceId) {
this.jobInstanceId = jobInstanceId;
}
public EntryTO process(EntryTO entryTO) throws Exception {
entryTO.setJobInstanceId(this.getJobInstanceId());
return entryTO;
}
}
package com.company.springbatch.testcase.to;
/**
* TO to store the entry file data
*
* @author diego.moreno
* @since 2010-11-12
*/
public class EntryTO {
/** Line of the entry file */
private String fileline;
private Integer jobInstanceId;
/**
* @return The fileline
*/
public String getFileline() {
return fileline;
}
/**
* @param fileline
* The fileline to set
*/
public void setFileline(String fileline) {
this.fileline = fileline;
}
/**
* @return the jobInstanceId
*/
public Integer getJobInstanceId() {
return jobInstanceId;
}
/**
* @param jobInstanceId
* the jobInstanceId to set
*/
public void setJobInstanceId(Integer jobInstanceId) {
this.jobInstanceId = jobInstanceId;
}
}
ga
g
ad
gfdg
ag
as
gfa
fg
agf
afg
adf
gadf
ga
fdg
adfg
asdfgr
asdfg
asdf
gad
fga
fg
adfg
adfg
af
gadf
ga
fg
adfg
asdfga
fga
grswet
gra
gf
sagf
afg
ad
gfrae
gr
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
<import resource="classpath:/jobs/applicationMainJob.xml"/>
<!-- Initialise the database before every test case: -->
<bean id="dataSourceInitializer" class="test.jdbc.datasource.DataSourceInitializer">
<property name="dataSource" ref="dataSource"/>
<property name="initialize" value="${batch.data.source.init}"/>
<property name="initScripts">
<list>
<value>${batch.drop.script}</value>
<value>${batch.schema.script}</value>
<value>${batch.business.schema.script}</value>
</list>
</property>
</bean>
</beans>
/*
* Copyright 2006-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.company.springbatch.testcase;
import static org.junit.Assert.assertEquals;
import java.io.File;
import org.apache.commons.io.FileUtils;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class JobExecutionTests {
@BeforeClass
public static void init() throws Exception {
FileUtils.deleteQuietly(new File("embeddedDatabase"));
}
@Autowired
@Qualifier("applicationMainJob")
private Job job;
@Autowired
private JobLauncher jobLauncher;
@Test
public void testLaunchJob() throws Exception {
JobExecution jobExecution = jobLauncher.run(job, new JobParametersBuilder().addString("entry.file.name", "input_file_13.TXT").toJobParameters());
assertEquals(BatchStatus.COMPLETED, jobExecution.getStatus());
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">
<import resource="classpath:/applicationContext.xml" />
<step id="loadStep" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="entryReader" processor="entryProcessor" writer="tableWriter"
commit-interval="${entry.commit.interval}" skip-limit="${skip.limit}">
<skippable-exception-classes>
<include
class="com.company.springbatch.testcase.exception.TestcaseException" />
<include
class="org.springframework.batch.item.file.FlatFileParseException" />
<include class="org.springframework.dao.DataAccessException" />
<exclude
class="org.springframework.dao.NonTransientDataAccessResourceException" />
<exclude
class="org.springframework.dao.InvalidDataAccessResourceUsageException" />
<exclude
class="org.springframework.dao.PermissionDeniedDataAccessException" />
</skippable-exception-classes>
</chunk>
<listeners>
<listener ref="entryListener" />
</listeners>
</tasklet>
</step>
<!-- Reader responsável por ler o arquivo de entrada -->
<bean id="entryReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource"
value="file:${input.directory}/#{jobParameters[entry.file.name]}" />
<property name="linesToSkip" value="0" />
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="fieldSetMapper">
<bean
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="targetType"
value="com.company.springbatch.testcase.to.EntryTO" />
</bean>
</property>
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter">
<util:constant
static-field="com.company.springbatch.testcase.constants.ApplicationConstants.DELIMITER_FIELD" />
</property>
<property name="names">
<util:constant
static-field="com.company.springbatch.testcase.constants.ApplicationConstants.NAMES_DELIMITED_LINE_TOKENIZER" />
</property>
</bean>
</property>
</bean>
</property>
</bean>
<bean id="entryProcessor"
class="com.company.springbatch.testcase.processor.EntryFileProcessor"
scope="step">
<property name="jobInstanceId" value="#{stepExecution.jobExecution.jobId}" />
</bean>
<bean id="tableWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="${application.insert}" />
<property name="itemSqlParameterSourceProvider">
<bean
class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
</bean>
<bean id="entryListener"
class="com.company.springbatch.testcase.listener.EntryFileListener"
scope="step" />
</beans>
log4j.rootCategory=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - <%m>%n
log4j.category.org.apache.activemq=ERROR
log4j.category.org.springframework.batch=DEBUG
log4j.category.org.springframework.transaction=INFO
log4j.category.org.hibernate.SQL=DEBUG
# for debugging datasource initialization
# log4j.category.test.jdbc=DEBUG
package com.company.springbatch.testcase.listener;
import org.apache.log4j.Logger;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import com.company.springbatch.testcase.to.OutputTO;
public class OutputFileListener implements SkipListener<OutputTO, OutputTO>,
StepExecutionListener {
private Logger log = Logger.getLogger(OutputFileListener.class);
public ExitStatus afterStep(StepExecution arg0) {
log.info("End of writing of the output files");
log.info("End of the execution");
return null;
}
public void beforeStep(StepExecution arg0) {
log.info("Start of writing of the output files");
}
public void onSkipInProcess(OutputTO outputTO, Throwable throwable) {
log.error("Error processing the register [" + outputTO.getRegisterId()
+ "]");
}
public void onSkipInRead(Throwable arg0) {
log.error("Error reading the control table", arg0);
}
public void onSkipInWrite(OutputTO outputTO, Throwable arg1) {
log.error("Error writing the register [" + outputTO.getRegisterId()
+ "] in the output file [" + outputTO.getOutputFilename() + "]", arg1);
}
}
package com.company.springbatch.testcase.processor;
import org.springframework.batch.item.ItemProcessor;
import com.company.springbatch.testcase.to.OutputTO;
public class OutputFileProcessor implements ItemProcessor<OutputTO, OutputTO> {
private String filename;
/**
* @return the filename
*/
public String getFilename() {
return filename;
}
/**
* @param filename
* the filename to set
*/
public void setFilename(String filename) {
this.filename = filename;
}
public OutputTO process(OutputTO outputTO) throws Exception {
outputTO.setOutputFilename(this.getFilename());
outputTO.setProcessStatus("Y");
return outputTO;
}
}
package com.company.springbatch.testcase.to;
import java.util.Date;
public class OutputTO {
private Integer registerId;
private Integer jobInstanceId;
private String fileline;
private Date loadDate;
private Date processDate;
private String processStatus;
private String outputFilename;
/**
* @return the registerId
*/
public Integer getRegisterId() {
return registerId;
}
/**
* @param registerId
* the registerId to set
*/
public void setRegisterId(Integer registerId) {
this.registerId = registerId;
}
/**
* @return the jobInstanceId
*/
public Integer getJobInstanceId() {
return jobInstanceId;
}
/**
* @param jobInstanceId
* the jobInstanceId to set
*/
public void setJobInstanceId(Integer jobInstanceId) {
this.jobInstanceId = jobInstanceId;
}
/**
* @return the fileline
*/
public String getFileline() {
return fileline;
}
/**
* @param fileline
* the fileline to set
*/
public void setFileline(String fileline) {
this.fileline = fileline;
}
/**
* @return the loadDate
*/
public Date getLoadDate() {
return loadDate;
}
/**
* @param loadDate
* the loadDate to set
*/
public void setLoadDate(Date loadDate) {
this.loadDate = loadDate;
}
/**
* @return the processDate
*/
public Date getProcessDate() {
return processDate;
}
/**
* @param processDate
* the processDate to set
*/
public void setProcessDate(Date processDate) {
this.processDate = processDate;
}
/**
* @return the processStatus
*/
public String getProcessStatus() {
return processStatus;
}
/**
* @param processStatus
* the processStatus to set
*/
public void setProcessStatus(String processStatus) {
this.processStatus = processStatus;
}
/**
* @return the outputFilename
*/
public String getOutputFilename() {
return outputFilename;
}
/**
* @param outputFilename
* the outputFilename to set
*/
public void setOutputFilename(String outputFilename) {
this.outputFilename = outputFilename;
}
}
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company.springbatch.testcase</groupId>
<artifactId>testcase</artifactId>
<version>1.0.0</version>
<properties>
<spring.batch.version>2.1.6.CI-SNAPSHOT</spring.batch.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>${spring.batch.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<version>${spring.batch.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>2.5.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.6.2.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.0-beta-3</version>
<configuration>
<format>{0,date,dd-MM-yyyy HH:mm:ss}</format>
<items>
<item>timestamp</item>
</items>
</configuration>
<executions>
<execution>
<phase>validate</phase>
<goals>
<goal>create</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.1</version>
<configuration>
<archive>
<manifest>
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<mainClass>org.springframework.batch.core.launch.support.CommandLineJobRunner</mainClass>
<addClasspath>true</addClasspath>
<classpathPrefix>../lib</classpathPrefix>
</manifest>
<manifestEntries>
<Implementation-Version>${project.version}</Implementation-Version>
<Implementation-Build>${buildNumber}</Implementation-Build>
<Implementation-Title>${project.artifactId}</Implementation-Title>
</manifestEntries>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
application.insert= \
INSERT INTO TEST_CASE_CONTROL \
(JOB_INSTANCE_ID, \
FILELINE, \
LOAD_DATE) \
VALUES(:jobInstanceId, \
:fileline, \
CURRENT_TIMESTAMP)
application.select.registers.count= \
SELECT COUNT(REGISTER_ID) \
FROM TEST_CASE_CONTROL \
WHERE JOB_INSTANCE_ID = ? \
AND PROCESS_STATUS = 'N'
application.recovery.not.processed= \
SELECT SUB.REGISTER_ID, \
SUB.JOB_INSTANCE_ID, \
SUB.FILELINE, \
SUB.LOAD_DATE, \
SUB.PROCESS_DATE, \
SUB.PROCESS_STATUS, \
SUB.OUTPUT_FILENAME \
FROM (SELECT CONTROL.*, ROW_NUMBER() OVER() AS ROWNUM \
FROM TEST_CASE_CONTROL CONTROL \
WHERE CONTROL.PROCESS_STATUS = 'N' \
AND CONTROL.JOB_INSTANCE_ID = ?) SUB \
WHERE SUB.ROWNUM > ? AND SUB.ROWNUM <= ?
application.recovery.not.processed.test= \
SELECT REGISTER_ID, \
JOB_INSTANCE_ID, \
FILELINE, \
LOAD_DATE, \
PROCESS_DATE, \
PROCESS_STATUS, \
OUTPUT_FILENAME \
FROM TEST_CASE_CONTROL CONTROL \
WHERE CONTROL.PROCESS_STATUS = 'N' \
AND CONTROL.JOB_INSTANCE_ID = ? AND \
WHERE REGISTER_ID > ? AND REGISTER_ID <= ?
application.recovery.not.processed.select=REGISTER_ID,\
JOB_INSTANCE_ID,\
FILELINE,\
LOAD_DATE,\
PROCESS_DATE,\
PROCESS_STATUS,\
OUTPUT_FILENAME
application.recovery.not.processed.from=(SELECT CONTROL.*, ROW_NUMBER() OVER() AS ROWNUM \
FROM TEST_CASE_CONTROL CONTROL \
WHERE CONTROL.PROCESS_STATUS = 'N' \
AND CONTROL.JOB_INSTANCE_ID = :jobId) SUB
application.recovery.not.processed.where=SUB.ROWNUM > :minId AND SUB.ROWNUM <= :maxId
application.update= \
UPDATE TEST_CASE_CONTROL \
SET PROCESS_DATE = CURRENT_TIMESTAMP, \
PROCESS_STATUS = :processStatus, \
OUTPUT_FILENAME = :outputFilename \
WHERE REGISTER_ID = :registerId
package com.company.springbatch.testcase.support;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
public class RunIdIncrementer implements JobParametersIncrementer {
private static String RUN_ID_KEY = "run.id";
public JobParameters getNext(JobParameters parameters) {
if (parameters == null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong(RUN_ID_KEY, new Long(1L))
.toJobParameters();
}
Long id = new Long(parameters.getLong(RUN_ID_KEY, 1L) + 1);
return new JobParametersBuilder().addLong(RUN_ID_KEY, id)
.toJobParameters();
}
}
package com.company.springbatch.testcase.processor;
import org.apache.log4j.Logger;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import com.company.springbatch.testcase.dao.ApplicationDAO;
public class StartOutputProcess implements Tasklet {
private int registersQtd;
private Integer jobInstanceId;
private ApplicationDAO applicationDAO;
private static Logger log = Logger.getLogger(StartOutputProcess.class);
/**
* @return the registersQtd
*/
public int getRegistersQtd() {
return registersQtd;
}
/**
* @param registersQtd
* the registersQtd to set
*/
public void setRegistersQtd(int registersQtd) {
this.registersQtd = registersQtd;
}
/**
* @return the jobInstanceId
*/
public Integer getJobInstanceId() {
return jobInstanceId;
}
/**
* @param jobInstanceId
* the jobInstanceId to set
*/
public void setJobInstanceId(Integer jobInstanceId) {
this.jobInstanceId = jobInstanceId;
}
/**
* @return the applicationDAO
*/
public ApplicationDAO getApplicationDAO() {
return applicationDAO;
}
/**
* @param applicationDAO
* the applicationDAO to set
*/
public void setApplicationDAO(ApplicationDAO applicationDAO) {
this.applicationDAO = applicationDAO;
}
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {
log.info("Starting the output files creation");
registersPartitioner();
chunkContext.getStepContext().getStepExecution().getJobExecution()
.getExecutionContext()
.putInt("registers.qtd", this.getRegistersQtd());
return RepeatStatus.FINISHED;
}
private void registersPartitioner() {
log.debug("Executing the method for partitioning registers.");
this.setRegistersQtd(applicationDAO.recoveryRegistersQtd(this
.getJobInstanceId()));
log.debug("Quantity of registers recovered with success: ["
+ this.getRegistersQtd() + "]");
}
}
skip.limit=100
entry.commit.interval=1000
output.commit.interval=1000
output.fetch.size=1
output.files.qtd=10
input.directory=./input
output.directory=./output
# Database pool configurations
database.pool.initialSize=1
database.pool.maxActive=5
database.pool.maxIdle=1
database.pool.minIdle=0
database.pool.timeBetweenEvictionRunsMillis=600000
database.pool.poolPreparedStatements=true
database.pool.maxOpenPreparedStatements=50
database.pool.url=jdbc:derby:./embeddedDatabase;create=true
batch.drop.script=classpath:/org/springframework/batch/core/schema-drop-derby.sql
batch.schema.script=classpath:/org/springframework/batch/core/schema-derby.sql
batch.business.schema.script=classpath:/business-schema-derby.sql
batch.data.source.init=true
package com.company.springbatch.testcase.exception;
public class TestcaseException extends Exception {
private static final long serialVersionUID = 1416003604939837082L;
public TestcaseException(String message) {
super(message);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-2.5.xsd">
<import resource="classpath:/applicationContext.xml" />
<bean id="processPartitioner" class="com.company.springbatch.testcase.processor.StartOutputProcess" scope="step">
<property name="applicationDAO" ref="applicationDAORef" />
<property name="jobInstanceId" value="#{stepExecution.jobExecution.jobId}" />
</bean>
<bean id="partitioner" class="com.company.springbatch.testcase.support.ControlPartitioner" scope="step">
<property name="registersQtd" value="#{jobExecutionContext[registers.qtd]}" />
<property name="maxThreadNumber" value="${output.files.qtd}" />
<property name="fileName" value="OUTPUT_FILE_" />
</bean>
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="maxPoolSize" value="${output.files.qtd}" />
<property name="corePoolSize" value="${output.files.qtd}" />
</bean>
<bean id="partitionedReadStep" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean">
<property name="jobRepository" ref="jobRepository" />
<property name="transactionManager" ref="transactionManager" />
<property name="itemReader" ref="outputItemReader" />
<property name="itemProcessor" ref="outputItemProcessor" />
<property name="itemWriter" ref="outputItemWriterDelegate" />
<property name="commitInterval" value="${output.commit.interval}" />
<property name="skipLimit" value="${skip.limit}" />
<property name="listeners" ref="outputFileListener" />
</bean>
<bean id="outputItemReader" scope="step" autowire-candidate="false" class="org.springframework.batch.item.database.JdbcPagingItemReader">
<property name="dataSource" ref="dataSource" />
<property name="rowMapper">
<bean class="org.springframework.jdbc.core.BeanPropertyRowMapper">
<property name="mappedClass" value="com.company.springbatch.testcase.to.OutputTO" />
</bean>
</property>
<property name="fetchSize" value="${output.fetch.size}" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="selectClause" value="${application.recovery.not.processed.select}"/>
<property name="fromClause" value="${application.recovery.not.processed.from}"/>
<property name="whereClause" value="${application.recovery.not.processed.where}"/>
<property name="sortKey" value="REGISTER_ID"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="jobId" value="#{stepExecution.jobExecution.jobId}"/>
<entry key="minId" value="#{stepExecutionContext[start_control_id]}"/>
<entry key="maxId" value="#{stepExecutionContext[end_control_id]}"/>
</map>
</property>
</bean>
<bean id="oldOutputItemReader" class="org.springframework.batch.item.database.JdbcCursorItemReader" scope="step">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="${application.recovery.not.processed}" />
<property name="rowMapper">
<bean class="org.springframework.jdbc.core.BeanPropertyRowMapper">
<property name="mappedClass" value="com.company.springbatch.testcase.to.OutputTO" />
</bean>
</property>
<property name="fetchSize" value="${output.fetch.size}" />
<property name="preparedStatementSetter">
<bean class="org.springframework.batch.core.resource.ListPreparedStatementSetter">
<property name="parameters">
<list>
<value>#{stepExecution.jobExecution.jobId}</value>
<value>#{stepExecutionContext[start_control_id]}</value>
<value>#{stepExecutionContext[end_control_id]}</value>
</list>
</property>
</bean>
</property>
<property name="verifyCursorPosition" value="false" />
</bean>
<bean id="outputItemProcessor" class="com.company.springbatch.testcase.processor.OutputFileProcessor" scope="step">
<property name="filename" value="#{stepExecutionContext[file_name]}.TXT" />
</bean>
<bean id="outputItemWriterDelegate" class="org.springframework.batch.item.support.CompositeItemWriter" scope="step">
<property name="delegates">
<list>
<ref bean="outputItemWriter" />
<ref bean="updateControlTable" />
</list>
</property>
</bean>
<bean id="outputItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:${output.directory}/#{stepExecutionContext[file_name]}.TXT" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter">
<util:constant static-field="com.company.springbatch.testcase.constants.ApplicationConstants.DELIMITER_FIELD" />
</property>
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="registerId,fileline" />
</bean>
</property>
</bean>
</property>
<property name="shouldDeleteIfEmpty" value="true" />
</bean>
<bean id="updateControlTable" class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource" />
<property name="sql" value="${application.update}" />
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
</bean>
<bean id="outputFileListener" class="com.company.springbatch.testcase.listener.OutputFileListener" scope="step">
</bean>
</beans>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment