Skip to content

Instantly share code, notes, and snippets.

@jonatan-ivanov
Last active May 17, 2020 21:04
Show Gist options
  • Save jonatan-ivanov/b49df3d83f3d2dc900535263d1bf7586 to your computer and use it in GitHub Desktop.
Save jonatan-ivanov/b49df3d83f3d2dc900535263d1bf7586 to your computer and use it in GitHub Desktop.
spring-batch continuous polling example
  • I have a paginated REST API which I would like to poll
  • Since the source data is growing, I would like to set this up as a scheduled sync job and rerun it from the point where the previous execution left off (even if it was COMPLETED) basically advancing on the new data periodically
  • AbstractPaginatedDataItemReader seems to be suitable, it is able to:
  • What I'm missing is creating the new JobParameters based on the previous ExecutionContext
  • It seems DefaultJobParametersExtractor does the trick I need
  • And it is used from the JobStep class in a way I need
  • Unfortunately this is only available to JobStep and I have a TaskletStep
  • So I ended up reimplementing the parts I need from JobStep and DefaultJobParametersExtractor

Is there a better way to do this? The code in Example.java is what I would like to get rid off by providing the RunIdIncrementer and a JobParametersExtractor.

@Configuration
@EnableScheduling
@EnableBatchProcessing
public class BatchConfiguration {
@Bean
@StepScope
public ResourceReader reader(
@Value("...") String url,
@Value("...") int pageSize,
@Value("#{jobParameters['ResourceReader.index']}") int index) {
return new ResourceReader(url, pageSize, index);
}
@Bean
public Job feedingJob(JobBuilderFactory jobBuilderFactory, Step feedingStep) {
return jobBuilderFactory.get("feedingJob")
.incrementer(new RunIdIncrementer())
.flow(feedingStep)
.end()
.build();
}
@Bean
public Step feedingStep(StepBuilderFactory stepBuilderFactory, ResourceReader reader) {
return stepBuilderFactory.get("feedingStep")
.<Resource, String>chunk(10)
.reader(reader)
.processor((Function<Resource, String>) (Resource::getId))
.writer(System.out::println)
.build();
}
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {...}
}
public class Example {
private final JobLauncher jobLauncher;
private final JobExplorer jobExplorer;
private final Job job;
@Scheduled(fixedDelayString = "${...}")
public void run() throws Exception {
jobLauncher.run(job, createJobParameters());
}
private JobParameters createJobParameters() {
return Optional.of(jobExplorer.getJobInstances(job.getName(), 0, 1))
.filter(jobInstances -> !jobInstances.isEmpty())
.map(jobInstances -> jobExplorer.getJobExecutions(jobInstances.get(0)))
.filter(jobExecutions -> !jobExecutions.isEmpty())
.map(jobExecutions -> jobExecutions.get(0).getStepExecutions())
.filter(stepExecutions -> !stepExecutions.isEmpty())
.map(stepExecutions -> Iterables.get(stepExecutions, 0))
.map(stepExecution -> createJobParameters(stepExecution.getExecutionContext()))
.orElseGet(() -> createJobParameters(0));
}
private JobParameters createJobParameters(ExecutionContext executionContext) {
return createJobParameters(job, executionContext.getLong("ResourceReader.index"));
}
private JobParameters createJobParameters(long index) {
return new JobParametersBuilder(jobExplorer)
.getNextJobParameters(job)
.addLong("ResourceReader.index", index) // no addInt, though the index must be an int because of the abstract reader
.toJobParameters();
}
}
public class ResourceReader extends AbstractPaginatedDataItemReader<Resource> {
private final String url;
public ResourceReader(String url, int pageSize, int index) {
this.url = url;
this.pageSize = pageSize;
setCurrentItemCount(index);
setName(this.getClass().getSimpleName());
}
@Override
protected Iterator<Resource> doPageRead() {
return getResources(getIndex(), pageSize).iterator();
}
private List<Resource> getResources(int index, int limit) {
// HTTP call to a paginated API
}
@Override
protected void jumpToItem(int itemLastIndex) {
// the API is paginated, no need to jump
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
super.update(executionContext);
// the current item count and the index are int but the JobParameters only have addLong there is no addInt
executionContext.putLong(getExecutionContextKey("index"), getIndex());
}
private int getIndex() {
return getCurrentItemCount() - 1;
}
}
@fmbenhassine
Copy link

fmbenhassine commented Jul 18, 2019

You are using the RunIdIncrementer which is not the best option IMO. You need a custom incrementer that increments the index based on the latest value from the previous execution. Here is a quick example to illustrate what I mean:

import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;

public class IndexIncrementer implements JobParametersIncrementer {

	private static final String INDEX_KEY = "index";

	@Override
	public JobParameters getNext(JobParameters previousJobParameters) {

		JobParametersBuilder nextJobParametersBuilder = new JobParametersBuilder();

		long nextIndex = 0L; // or 1 or any other initial value
		if (previousJobParameters != null) {
			Long previousIndex = previousJobParameters.getLong(INDEX_KEY);
			if (previousIndex != null) {
				nextIndex = previousIndex + 1; // or + any other increment
			}
		}

		return nextJobParametersBuilder
				.addLong(INDEX_KEY, nextIndex)
				.toJobParameters();
	}
}

With that, you can use a job operator to start the next instance like:

@Scheduled(fixedDelayString = "${...}")
public void run() throws Exception {
   jobOperator.startNextInstance("myJob");
}

And get rid of the createJobParameters methods.

Hope this helps.

NB: The JobParametersIncrementer is only called when you use the JobOperator or CommandLineJobRunner to run the next instance of a job. So in you example, the RunIdIncrementer has no effect.

@jonatan-ivanov
Copy link
Author

Thank you very much, I really appreciate your help especially your effort providing the example. I have a few other questions, could you please take a look?

I'm not sure your example will work in my use case, could you please verify my thought process? When the job is finished, I don't know how much data was actually written. So based on the previous JobParameters I don't know how much I should increment the index. Maybe it wrote thousands of items, maybe zero, maybe it failed after the 10th item. So in the next round, I would like to know when the previous execution stopped not where it started.
So the only way to know where the previous execution stopped is looking into its ExecutionContext where this information is persisted after every writes, right?

I'm only using the RunIdIncrementer to increment the run.id (so that I will not get an exception for the duplicate JobInstances), The index is "incremented" via getting it from the previous ExecutionContext: createJobParameters(job, executionContext.getLong("ResourceReader.index")).

The JobParametersIncrementer (RunIdIncrementer in my case) is called because I'm calling .getNextJobParameters(job) on the JobParametersBuilder, and it also seems, it works. (Example.java line#29)

The JobOperator is a very good point, and I think I will use it, the only thing I need to take care about (this is not in the example above) is that I want to start the job from the scheduled method and from a controller. For the scheduled method, I want the start/run call to block while from the controller, I want a non-blocking call. Right now I have two JobLaunchers and I'm injecting them to the right place. I might need to create two JobOperators one is using the blocking and the other one is the non-blocking JobLauncher.

@fmbenhassine
Copy link

Thanks for the clarifications. I thought the increment was a fixed value, but now I understand it is rather dynamic. In that case, we can always use a custom incrementer that gets the value from the execution context of the last run. Here is an updated version of the incrementer with a complete example:

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersIncrementer;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.support.SimpleJobOperator;
import org.springframework.batch.core.listener.ExecutionContextPromotionListener;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

	@Autowired
	private JobBuilderFactory jobs;

	@Autowired
	private StepBuilderFactory steps;

	@Bean
	@StepScope
	public ItemStreamReader<Integer> itemReader(@Value("#{jobParameters['index']}") Integer index) {
		class MyItemReader implements ItemStreamReader<Integer> {
			private List<Integer> items = new ArrayList<>();
			private Iterator<Integer> iterator;
			private int random;

			@Override
			public void open(ExecutionContext executionContext) throws ItemStreamException {
				random = new Random().nextInt(20);
				items.clear();
				for (int i = index; i < random; i++) {
					items.add(i);
				}
				iterator = items.iterator();
			}

			@Override
			public void update(ExecutionContext executionContext) throws ItemStreamException {
				executionContext.put("index", (long) (random - 1));
			}

			@Override
			public void close() throws ItemStreamException {

			}

			@Override
			public Integer read() {
				return iterator.hasNext()? iterator.next() : null;
			}
		}

		return new MyItemReader();
	}

	@Bean
	public ItemWriter<Integer> itemWriter() {
		return items -> {
			for (Integer item : items) {
				System.out.println("writing item = " + item);
			}
		};
	}

	@Bean
	public Step step() {
		return steps.get("step")
				.<Integer, Integer>chunk(5)
				.reader(itemReader(null))
				.writer(itemWriter())
				.listener(promotionListener())
				.build();
	}

	// this will promote the key "index" from the step execution context (update by the reader) to the job execution
	// context (from which the incrementer will get the value)
	private ExecutionContextPromotionListener promotionListener() {
		ExecutionContextPromotionListener promotionListener = new ExecutionContextPromotionListener();
		promotionListener.setKeys(new String[] {"index"});
		return promotionListener;
	}

	@Bean
	public Job job() {
		return jobs.get("job")
				.incrementer(incrementer(null))
				.start(step())
				.build();
	}

	@Bean
	public JobParametersIncrementer incrementer(JobRepository jobRepository) {
		return new IndexIncrementer(jobRepository);
	}

	@Bean
	public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
		JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
		jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
		return jobRegistryBeanPostProcessor;
	}

	@Bean
	public JobOperator jobOperator(
			JobLauncher jobLauncher,
			JobRegistry jobRegistry,
			JobExplorer jobExplorer,
			JobRepository jobRepository
	) {
		SimpleJobOperator jobOperator = new SimpleJobOperator();
		jobOperator.setJobExplorer(jobExplorer);
		jobOperator.setJobLauncher(jobLauncher);
		jobOperator.setJobRegistry(jobRegistry);
		jobOperator.setJobRepository(jobRepository);
		return jobOperator;
	}

	class IndexIncrementer implements JobParametersIncrementer {

		private static final String INDEX_KEY = "index";

		private JobRepository jobRepository;

		IndexIncrementer(JobRepository jobRepository) {
			this.jobRepository = jobRepository;
		}

		@Override
		public JobParameters getNext(JobParameters previousJobParameters) {
			JobParametersBuilder nextJobParametersBuilder = new JobParametersBuilder();
			long nextIndex = 0L; // or 1 or any other initial value

			// get last index from previous job execution context
			JobExecution lastJobExecution = jobRepository.getLastJobExecution("job", previousJobParameters);
			if (lastJobExecution != null) {
				Long lastIndex = (Long) lastJobExecution.getExecutionContext().get(INDEX_KEY);
				if (lastIndex != null) {
					nextIndex = lastIndex + 1;
				}
			}
			return nextJobParametersBuilder
					.addLong(INDEX_KEY, nextIndex)
					.toJobParameters();
		}
	}

	public static void main(String[] args) throws Exception {
		ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
		JobOperator jobOperator = context.getBean(JobOperator.class);

		// start first instance (index 0)
		jobOperator.startNextInstance("job");

		// start next instance (index calculated from previous execution)
		jobOperator.startNextInstance("job");

		// start next instance (index calculated from previous execution)
		jobOperator.startNextInstance("job");
	}

}

You can ignore my reader, I created it to simulate things. Your reader already puts the index in the execution context of the step, you would need to promote it to the job execution context because this is where the inrementer would get it.

The example uses a random index (and might fail if the random value is the same), but if you run it you would see that each run will start from the previous index:

[info 2019/07/19 12:42:09.955 CEST <main> tid=0x1] Bean 'org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration' of type [org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$$EnhancerBySpringCGLIB$$d4ffe7b2] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[info 2019/07/19 12:42:09.997 CEST <main> tid=0x1] Bean 'jobRepository' of type [com.sun.proxy.$Proxy31] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[info 2019/07/19 12:42:09.999 CEST <main> tid=0x1] Bean 'jobBuilders' of type [org.springframework.batch.core.configuration.annotation.JobBuilderFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[info 2019/07/19 12:42:10.005 CEST <main> tid=0x1] Bean 'transactionManager' of type [com.sun.proxy.$Proxy33] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[info 2019/07/19 12:42:10.006 CEST <main> tid=0x1] Bean 'stepBuilders' of type [org.springframework.batch.core.configuration.annotation.StepBuilderFactory] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[info 2019/07/19 12:42:10.007 CEST <main> tid=0x1] Bean 'myJob' of type [org.springframework.batch.sample.MyJob$$EnhancerBySpringCGLIB$$a3caf2b9] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[info 2019/07/19 12:42:10.013 CEST <main> tid=0x1] Bean 'jobRegistry' of type [com.sun.proxy.$Proxy34] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
[warn 2019/07/19 12:42:10.080 CEST <main> tid=0x1] org.springframework.batch.item.ItemStreamReader is an interface. The implementing class will not be queried for annotation based listener configurations. If using @StepScope on a @Bean method, be sure to return the implementing class so listener annotations can be used.
[warn 2019/07/19 12:42:10.107 CEST <main> tid=0x1] No datasource was provided...using a Map based JobRepository
[warn 2019/07/19 12:42:10.108 CEST <main> tid=0x1] No transaction manager was provided, using a ResourcelessTransactionManager
[info 2019/07/19 12:42:10.241 CEST <main> tid=0x1] No TaskExecutor has been set, defaulting to synchronous executor.
[info 2019/07/19 12:42:10.265 CEST <main> tid=0x1] Locating parameters for next instance of job with name=job
[info 2019/07/19 12:42:10.278 CEST <main> tid=0x1] Attempting to launch job with name=job and parameters={index=0}
[info 2019/07/19 12:42:10.304 CEST <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{index=0}]
[info 2019/07/19 12:42:10.345 CEST <main> tid=0x1] Executing step: [step]
writing item = 0
writing item = 1
writing item = 2
writing item = 3
writing item = 4
writing item = 5
writing item = 6
[info 2019/07/19 12:42:10.470 CEST <main> tid=0x1] Step: [step] executed in 123ms
[info 2019/07/19 12:42:10.477 CEST <main> tid=0x1] Job: [SimpleJob: [name=job]] completed with the following parameters: [{index=0}] and the following status: [COMPLETED] in 142ms
[info 2019/07/19 12:42:10.477 CEST <main> tid=0x1] Locating parameters for next instance of job with name=job
[info 2019/07/19 12:42:10.491 CEST <main> tid=0x1] Attempting to launch job with name=job and parameters={index=7}
[info 2019/07/19 12:42:10.494 CEST <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{index=7}]
[info 2019/07/19 12:42:10.500 CEST <main> tid=0x1] Executing step: [step]
writing item = 7
writing item = 8
writing item = 9
writing item = 10
writing item = 11
writing item = 12
[info 2019/07/19 12:42:10.506 CEST <main> tid=0x1] Step: [step] executed in 6ms
[info 2019/07/19 12:42:10.510 CEST <main> tid=0x1] Job: [SimpleJob: [name=job]] completed with the following parameters: [{index=7}] and the following status: [COMPLETED] in 14ms
[info 2019/07/19 12:42:10.511 CEST <main> tid=0x1] Locating parameters for next instance of job with name=job
[info 2019/07/19 12:42:10.516 CEST <main> tid=0x1] Attempting to launch job with name=job and parameters={index=13}
[info 2019/07/19 12:42:10.517 CEST <main> tid=0x1] Job: [SimpleJob: [name=job]] launched with the following parameters: [{index=13}]
[info 2019/07/19 12:42:10.521 CEST <main> tid=0x1] Executing step: [step]
writing item = 13
writing item = 14
writing item = 15
writing item = 16
[info 2019/07/19 12:42:10.527 CEST <main> tid=0x1] Step: [step] executed in 6ms
[info 2019/07/19 12:42:10.530 CEST <main> tid=0x1] Job: [SimpleJob: [name=job]] completed with the following parameters: [{index=13}] and the following status: [COMPLETED] in 11ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment