Skip to content

Instantly share code, notes, and snippets.

@cataphract
Created February 19, 2016 17:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save cataphract/e2d3f6cec0d85d91120a to your computer and use it in GitHub Desktop.
Save cataphract/e2d3f6cec0d85d91120a to your computer and use it in GitHub Desktop.
import org.springframework.batch.core.*
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE')
import org.springframework.batch.core.configuration.annotation.*
import org.springframework.batch.core.job.builder.FlowBuilder
import org.springframework.batch.core.job.flow.Flow
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.listener.ExecutionContextPromotionListener
import org.springframework.batch.core.partition.support.Partitioner
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.core.scope.context.StepSynchronizationManager
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.item.ExecutionContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
@EnableBatchProcessing
class AppConfig {
public final static String NUMBER_OF_ITEMS_KEY = 'NUMBER_OF_ITEMS'
public static Integer failOnItem = null
@Autowired
private JobBuilderFactory jobs
@Autowired
private StepBuilderFactory steps
@Bean
public Job job() {
Step partitionedStep = new RegisterStepScopeOnExecutionStep(
name: 'partitionedStep',
delegate: partitionedStep(null))
Flow mainFlow = new FlowBuilder('mainFlow')
.start(step1())
.next(partitionedStep)
.build()
jobs.get('myJob')
.start(mainFlow)
.build().build()
}
private Step step1() {
this.steps.get('step1')
.tasklet(new FindNumberOfJobsTasklet())
.listener(new ExecutionContextPromotionListener(keys: [NUMBER_OF_ITEMS_KEY]))
.build()
}
@Bean
@JobScope
Step partitionedStep(@Value('#{jobExecutionContext["NUMBER_OF_ITEMS"]}') n) {
Partitioner partitioner = { int gridSize ->
(1..n).collectEntries { i ->
['partition_' + i, new ExecutionContext(partitionIndex: i)]
}
} as Partitioner
steps.get('partitionedStep')
.partitioner(repeatedStep())
.partitioner('partitionedStep', partitioner)
.gridSize(n)
.build()
}
@Bean
Step repeatedStep() {
steps.get('repeatedStep').tasklet(perIndexTasklet()).build()
}
@Bean
@StepScope
PerIndexTasklet perIndexTasklet() {
new PerIndexTasklet()
}
}
class RegisterStepScopeOnExecutionStep implements Step {
@Delegate
Step delegate
String name
// Spring-batch likes to call getName() when building the job
// Override this so that we avoid the scoped target bean from being
// instantiated before it should
String getName() {
name
}
// Unfortunately the task executors don't properly propagate the
// job or the step scope. Force propagation of step scope.
// See https://jira.spring.io/browse/BATCH-2269
void execute(StepExecution stepExecution) throws JobInterruptedException {
try {
StepSynchronizationManager.register(stepExecution)
delegate.execute(stepExecution)
} finally {
StepSynchronizationManager.close()
}
}
}
class PerIndexTasklet implements Tasklet {
@Value("#{stepExecutionContext['partitionIndex']}")
int i
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (i == AppConfig.failOnItem) {
throw new RuntimeException("Failing on index $i")
}
println "Finish index $i"
}
}
class FindNumberOfJobsTasklet implements Tasklet {
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
chunkContext.stepContext.stepExecution.
executionContext.put(AppConfig.NUMBER_OF_ITEMS_KEY, 3)
}
}
def ctx = new AnnotationConfigApplicationContext(AppConfig)
def launcher = ctx.getBean(JobLauncher)
void showStatus(JobExecution jobExecution) {
println "Status is: ${jobExecution.status}"
println "Step executions: "
jobExecution.stepExecutions.each { StepExecution exec ->
println " ${exec.id}: ${exec.stepName} ${exec.status.toString() != 'COMPLETED' ? exec.status : ''}"
}
}
AppConfig.failOnItem = 2
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
showStatus(jobExecution)
AppConfig.failOnItem = null
jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
showStatus(jobExecution)
import org.springframework.batch.core.*
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE')
import org.springframework.batch.core.configuration.annotation.*
import org.springframework.batch.core.job.builder.FlowBuilder
import org.springframework.batch.core.job.flow.Flow
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.listener.ExecutionContextPromotionListener
import org.springframework.batch.core.partition.support.Partitioner
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.core.scope.context.StepSynchronizationManager
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.item.ExecutionContext
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
@EnableBatchProcessing
class AppConfig {
public final static String NUMBER_OF_ITEMS_KEY = 'NUMBER_OF_ITEMS'
public static Integer failOnItem = null
@Autowired
private JobBuilderFactory jobs
@Autowired
private StepBuilderFactory steps
@Bean
public Job job() {
Step partitionedStep = new RegisterStepScopeOnExecutionStep(
name: 'partitionedStep',
delegate: partitionedStep(null))
Flow mainFlow = new FlowBuilder('mainFlow')
.start(step1())
.next(partitionedStep)
.build()
jobs.get('myJob')
.start(mainFlow)
.build().build()
}
private Step step1() {
this.steps.get('step1')
.tasklet(new FindNumberOfJobsTasklet())
.listener(new ExecutionContextPromotionListener(keys: [NUMBER_OF_ITEMS_KEY]))
.build()
}
@Bean
@JobScope
Step partitionedStep(@Value('#{jobExecutionContext["NUMBER_OF_ITEMS"]}') n) {
Partitioner partitioner = { int gridSize ->
(1..n).collectEntries { i ->
['partition_' + i, new ExecutionContext(partitionIndex: i)]
}
} as Partitioner
Step repeatedStep = new OnlyDelegateExecuteStep(
name: 'partitionedStep',
delegate: steps.get('partitionedStep').flow(repeatedStepFlow(null)).build())
steps.get('partitionedStep')
.partitioner(repeatedStep)
.partitioner('partitionedStep', partitioner)
.gridSize(n)
.build()
}
@Bean
@StepScope
Flow repeatedStepFlow(@Value("#{stepExecutionContext['partitionIndex']}") Integer partitionIndex) {
new FlowBuilder('repeatedStepFlow')
.start(steps.get("repeatedStep-$partitionIndex").tasklet(perIndexTasklet()).build())
.build()
}
@Bean
@StepScope
PerIndexTasklet perIndexTasklet() {
new PerIndexTasklet()
}
}
class RegisterStepScopeOnExecutionStep implements Step {
@Delegate
Step delegate
String name
// Spring-batch likes to call getName() when building the job
// Override this so that we avoid the scoped target bean from being
// instantiated before it should
String getName() {
name
}
// Unfortunately the task executors don't properly propagate the
// job or the step scope. Force propagation of step scope.
// See https://jira.spring.io/browse/BATCH-2269
void execute(StepExecution stepExecution) throws JobInterruptedException {
try {
StepSynchronizationManager.register(stepExecution)
delegate.execute(stepExecution)
} finally {
StepSynchronizationManager.close()
}
}
}
class OnlyDelegateExecuteStep implements Step {
Step delegate
// In a partitioned context, we need to prevent calls to these
// three from instantiation the scoped bean
String name
boolean allowStartIfComplete = false
int startLimit = Integer.MAX_VALUE
@Override
void execute(StepExecution stepExecution) throws JobInterruptedException {
delegate.execute(stepExecution)
}
}
class PerIndexTasklet implements Tasklet {
@Value("#{stepExecutionContext['partitionIndex']}")
int i
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (i == AppConfig.failOnItem) {
throw new RuntimeException("Failing on index $i")
}
println "Finish index $i"
}
}
class FindNumberOfJobsTasklet implements Tasklet {
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
chunkContext.stepContext.stepExecution.
executionContext.put(AppConfig.NUMBER_OF_ITEMS_KEY, 3)
}
}
def ctx = new AnnotationConfigApplicationContext(AppConfig)
def launcher = ctx.getBean(JobLauncher)
void showStatus(JobExecution jobExecution) {
println "Status is: ${jobExecution.status}"
println "Step executions: "
jobExecution.stepExecutions.each { StepExecution exec ->
println " ${exec.id}: ${exec.stepName} ${exec.status.toString() != 'COMPLETED' ? exec.status : ''}"
}
}
AppConfig.failOnItem = 2
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
showStatus(jobExecution)
AppConfig.failOnItem = null
jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
showStatus(jobExecution)
import org.springframework.batch.core.*
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE')
import org.springframework.batch.core.Job
import org.springframework.batch.core.configuration.annotation.*
import org.springframework.batch.core.job.builder.FlowBuilder
import org.springframework.batch.core.job.flow.Flow
import org.springframework.batch.core.job.flow.FlowExecutionStatus
import org.springframework.batch.core.job.flow.JobExecutionDecider
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.listener.ExecutionContextPromotionListener
import org.springframework.batch.core.scope.context.ChunkContext
import org.springframework.batch.core.step.tasklet.Tasklet
import org.springframework.batch.repeat.RepeatStatus
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
@EnableBatchProcessing
class AppConfig {
public final static String NUMBER_OF_ITEMS_KEY = 'NUMBER_OF_ITEMS'
public static Integer failOnItem = null
@Autowired
private JobBuilderFactory jobs
@Autowired
private StepBuilderFactory steps
@Bean
public Job job() {
Step wrappingStep = new OverriddenNameStep(
name: 'wrappingStep',
delegate: steps.get('wrappingStep').flow(stepWrappingFlow()).allowStartIfComplete(true).build())
jobs.get("myJob")
.start(step1())
.next(decider())
.on('CONTINUE')
.to(wrappingStep)
.next(decider())
.on('CONTINUE') // we need to repeat these two lines
.to(wrappingStep)
.build()
.build()
}
private Step step1() {
this.steps.get('step1')
.tasklet(new FindNumberOfJobsTasklet())
.listener(new ExecutionContextPromotionListener(keys: [NUMBER_OF_ITEMS_KEY]))
.build()
}
@Bean
@JobScope
Decider decider() {
new Decider()
}
@Bean
@StepScope
Flow stepWrappingFlow() {
new FlowBuilder('stepWrapperFlow')
.start(repeatedStep(null))
.build()
}
@Bean
@StepScope
Step repeatedStep(@Value('#{decider.cur}') Integer currentIndex) {
steps.get("repeated-$currentIndex")
.tasklet(perIndexTasklet())
.build()
}
@Bean
@StepScope
PerIndexTasklet perIndexTasklet() {
new PerIndexTasklet()
}
}
class OverriddenNameStep implements Step {
String name
@Delegate
Step delegate
}
class Decider implements JobExecutionDecider { // job scoped bean
@Value('#{jobExecutionContext["NUMBER_OF_ITEMS"]}')
Integer n
int cur = 0
@Override
FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
if (++cur > n) {
FlowExecutionStatus.COMPLETED
} else {
new FlowExecutionStatus('CONTINUE')
}
}
}
class PerIndexTasklet implements Tasklet {
@Value('#{decider.cur}')
int i
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
if (i == AppConfig.failOnItem) {
throw new RuntimeException("Failing on index $i")
}
println "Finish index $i"
}
}
class FindNumberOfJobsTasklet implements Tasklet {
@Override
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {
chunkContext.stepContext.stepExecution.
executionContext.put(AppConfig.NUMBER_OF_ITEMS_KEY, 3)
}
}
def ctx = new AnnotationConfigApplicationContext(AppConfig)
def launcher = ctx.getBean(JobLauncher)
void showStatus(JobExecution jobExecution) {
println "Status is: ${jobExecution.status}"
println "Step executions: "
jobExecution.stepExecutions.each { StepExecution exec ->
println " ${exec.id}: ${exec.stepName} ${exec.status.toString() != 'COMPLETED' ? exec.status : ''}"
}
}
AppConfig.failOnItem = 2
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
showStatus(jobExecution)
AppConfig.failOnItem = null
jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:]))
showStatus(jobExecution)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment