Skip to content

Instantly share code, notes, and snippets.

@mminella
Last active November 10, 2015 12:59
Show Gist options
  • Save mminella/fe4e2b4ec15fc995ab2a to your computer and use it in GitHub Desktop.
Save mminella/fe4e2b4ec15fc995ab2a to your computer and use it in GitHub Desktop.
Incremental Imports via Spring Batch
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.spring.batch;
import static org.junit.Assert.assertEquals;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.sql.DataSource;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.support.HsqlPagingQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ResourceLoader;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseFactory;
import org.springframework.jdbc.datasource.init.DatabasePopulatorUtils;
import org.springframework.jdbc.datasource.init.ResourceDatabasePopulator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.ClassUtils;
/**
* @author Michael Minella
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {LastRunPersistenceTest.JobConfiguration.class})
public class LastRunPersistenceTest {
@Autowired
private DataSource dataSource;
private Date lastDate = Calendar.getInstance().getTime();
@Autowired
private Job job1;
@Autowired
private JobLauncher jobLauncher;
private int iteration = 0;
private boolean initalized = false;
@Before
public void setUp() {
if(!initalized) {
JdbcOperations jdbcTemplate = new JdbcTemplate(dataSource);
jdbcTemplate.execute("CREATE TABLE SOURCE_TABLE (FIELD1 CHAR(20) NOT NULL, " +
"FIELD2 VARCHAR(20) NOT NULL, " +
"INSERT_DATE TIMESTAMP NOT NULL)");
initalized = true;
}
}
/**
* This test method will add 10 records to the source_table table then launch a job to process them. The previous
* records are not deleted (appending only). The metadata for where to restart is stored in the Spring Batch Job
* Repository via the {@link com.michaelminella.batch.LastRunPersistenceTest.IncrementingListener}.
*
* @throws Exception Theoretically things can go wrong.
*/
@Test
public void test() throws Exception {
for(int i = 0; i < 3; i++) {
// Adds 10 records
addData();
// Processes the most recent 10 records
assertEquals(BatchStatus.COMPLETED, jobLauncher.run(job1,
new JobParametersBuilder().addDate("randomDate", new Date()).toJobParameters()).getStatus());
}
// Run the job without adding records to demonstrate that doesn't impact the incremental runs
assertEquals(BatchStatus.COMPLETED, jobLauncher.run(job1,
new JobParametersBuilder().addDate("randomDate", new Date()).toJobParameters()).getStatus());
for(int i = 0; i < 3; i++) {
// Adds 10 records
addData();
// Processes the most recent 10 records
assertEquals(BatchStatus.COMPLETED, jobLauncher.run(job1,
new JobParametersBuilder().addDate("randomDate", new Date()).toJobParameters()).getStatus());
}
}
/**
* Append 10 records to the current db table.
*/
private void addData() {
JdbcOperations jdbcTemplate = new JdbcTemplate(dataSource);
List<Object[]> params = new ArrayList<Object[]>();
int max = iteration + 10;
for(; iteration < max; iteration++) {
Object [] curParams = new Object[3];
curParams[0] = "field1 - " + iteration;
curParams[1] = "field2 - " + iteration;
lastDate = new Date(lastDate.getTime() + TimeUnit.HOURS.toMillis(1));
curParams[2] = lastDate;
params.add(curParams);
}
jdbcTemplate.batchUpdate("INSERT INTO SOURCE_TABLE VALUES(?, ?, ?)", params);
}
@Configuration
@EnableBatchProcessing
public static class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private ResourceLoader resourceLoader;
@Bean
@StepScope
public JdbcPagingItemReader<Item> itemReader(@Value("#{stepExecutionContext[startTimestamp]}") Date startTimestamp,
@Value("#{stepExecutionContext[endTimestamp]}") Date endTimestamp) throws Exception {
JdbcPagingItemReader<Item> reader = new JdbcPagingItemReader<Item>();
reader.setDataSource(dataSource());
reader.setFetchSize(3);
reader.setRowMapper(new RowMapper<Item>() {
@Override
public Item mapRow(ResultSet rs, int rowNum) throws SQLException {
Item item = new Item();
item.setField1(rs.getString(1));
item.setField2(rs.getString(2));
item.setInsertTime(rs.getDate(3));
return item;
}
});
HsqlPagingQueryProvider queryProvider = new HsqlPagingQueryProvider();
queryProvider.setSelectClause("select field1, field2, insert_date");
queryProvider.setFromClause("from source_table");
queryProvider.setWhereClause("insert_date > :startTimestamp and insert_date <= :endTimestamp");
Map<String, Order> sortKeys = new HashMap<String, Order>();
sortKeys.put("insert_date", Order.ASCENDING);
queryProvider.setSortKeys(sortKeys);
reader.setQueryProvider(queryProvider);
Map<String, Object> parameters = new HashMap<String, Object>();
parameters.put("startTimestamp", startTimestamp);
parameters.put("endTimestamp", endTimestamp);
reader.setParameterValues(parameters);
reader.afterPropertiesSet();
return reader;
}
@Bean
public ItemWriter<Item> itemWriter() {
return new ItemWriter<Item>() {
@Override
public void write(List<? extends Item> items) throws Exception {
for (Item item : items) {
System.err.println(item);
}
}
};
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.listener(incrementalListener())
.listener(promotionListener())
.<Item, Item>chunk(3)
.reader(itemReader(null, null))
.writer(itemWriter()).build();
}
@Bean
public Job job1() throws Exception {
return jobBuilderFactory.get("job1")
.start(step1())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String [] {"startTimestamp", "endTimestamp"});
return listener;
}
@Bean
public IncrementingListener incrementalListener() {
return new IncrementingListener(dataSource());
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseFactory().getDatabase();
}
@PostConstruct
protected void initialize() {
ResourceDatabasePopulator populator = new ResourceDatabasePopulator();
populator.addScript(
resourceLoader.getResource(ClassUtils.addResourcePathToPackagePath(Step.class, "schema-hsqldb.sql")));
populator.setContinueOnError(true);
DatabasePopulatorUtils.execute(populator, dataSource());
}
}
public static class IncrementingListener {
private JdbcOperations template;
@Autowired
private JobExplorer jobExplorer;
public IncrementingListener(DataSource dataSource) {
template = new JdbcTemplate(dataSource);
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
Date lastTimestamp = template.queryForObject("SELECT MAX(INSERT_DATE) FROM SOURCE_TABLE", Date.class);
// Get the last jobInstance...not the current one
List<JobInstance> jobInstances = jobExplorer.getJobInstances("job1", 0, 2);
Date startTimestamp = new Date(0);
if(jobInstances.size() > 1) {
JobInstance lastInstance = jobInstances.get(1);
if(lastInstance != null) {
List<JobExecution> executions = jobExplorer.getJobExecutions(lastInstance);
JobExecution lastExecution = executions.get(0);
for (JobExecution execution : executions) {
if(lastExecution.getEndTime().getTime() < execution.getEndTime().getTime()) {
lastExecution = execution;
}
}
if(lastExecution.getExecutionContext().containsKey("endTimestamp")) {
startTimestamp = (Date) lastExecution.getExecutionContext().get("endTimestamp");
}
}
}
stepExecution.getExecutionContext().put("startTimestamp", startTimestamp);
stepExecution.getExecutionContext().put("endTimestamp", lastTimestamp);
}
}
public static class Item {
private String field1;
private String field2;
private Date insertTime;
public String getField1() {
return field1;
}
public void setField1(String field1) {
this.field1 = field1;
}
public String getField2() {
return field2;
}
public void setField2(String field2) {
this.field2 = field2;
}
public Date getInsertTime() {
return insertTime;
}
public void setInsertTime(Date insertTime) {
this.insertTime = insertTime;
}
@Override
public String toString() {
return String.format("%s | %s | %s", field1, field2, insertTime.toString());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment