Created
February 19, 2016 17:51
-
-
Save cataphract/e2d3f6cec0d85d91120a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.batch.core.* | |
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE') | |
import org.springframework.batch.core.configuration.annotation.* | |
import org.springframework.batch.core.job.builder.FlowBuilder | |
import org.springframework.batch.core.job.flow.Flow | |
import org.springframework.batch.core.launch.JobLauncher | |
import org.springframework.batch.core.listener.ExecutionContextPromotionListener | |
import org.springframework.batch.core.partition.support.Partitioner | |
import org.springframework.batch.core.scope.context.ChunkContext | |
import org.springframework.batch.core.scope.context.StepSynchronizationManager | |
import org.springframework.batch.core.step.tasklet.Tasklet | |
import org.springframework.batch.item.ExecutionContext | |
import org.springframework.batch.repeat.RepeatStatus | |
import org.springframework.beans.factory.annotation.Autowired | |
import org.springframework.beans.factory.annotation.Value | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
@Configuration | |
@EnableBatchProcessing | |
class AppConfig { | |
public final static String NUMBER_OF_ITEMS_KEY = 'NUMBER_OF_ITEMS' | |
public static Integer failOnItem = null | |
@Autowired | |
private JobBuilderFactory jobs | |
@Autowired | |
private StepBuilderFactory steps | |
@Bean | |
public Job job() { | |
Step partitionedStep = new RegisterStepScopeOnExecutionStep( | |
name: 'partitionedStep', | |
delegate: partitionedStep(null)) | |
Flow mainFlow = new FlowBuilder('mainFlow') | |
.start(step1()) | |
.next(partitionedStep) | |
.build() | |
jobs.get('myJob') | |
.start(mainFlow) | |
.build().build() | |
} | |
private Step step1() { | |
this.steps.get('step1') | |
.tasklet(new FindNumberOfJobsTasklet()) | |
.listener(new ExecutionContextPromotionListener(keys: [NUMBER_OF_ITEMS_KEY])) | |
.build() | |
} | |
@Bean | |
@JobScope | |
Step partitionedStep(@Value('#{jobExecutionContext["NUMBER_OF_ITEMS"]}') n) { | |
Partitioner partitioner = { int gridSize -> | |
(1..n).collectEntries { i -> | |
['partition_' + i, new ExecutionContext(partitionIndex: i)] | |
} | |
} as Partitioner | |
steps.get('partitionedStep') | |
.partitioner(repeatedStep()) | |
.partitioner('partitionedStep', partitioner) | |
.gridSize(n) | |
.build() | |
} | |
@Bean | |
Step repeatedStep() { | |
steps.get('repeatedStep').tasklet(perIndexTasklet()).build() | |
} | |
@Bean | |
@StepScope | |
PerIndexTasklet perIndexTasklet() { | |
new PerIndexTasklet() | |
} | |
} | |
class RegisterStepScopeOnExecutionStep implements Step { | |
@Delegate | |
Step delegate | |
String name | |
// Spring-batch likes to call getName() when building the job | |
// Override this so that we avoid the scoped target bean from being | |
// instantiated before it should | |
String getName() { | |
name | |
} | |
// Unfortunately the task executors don't properly propagate the | |
// job or the step scope. Force propagation of step scope. | |
// See https://jira.spring.io/browse/BATCH-2269 | |
void execute(StepExecution stepExecution) throws JobInterruptedException { | |
try { | |
StepSynchronizationManager.register(stepExecution) | |
delegate.execute(stepExecution) | |
} finally { | |
StepSynchronizationManager.close() | |
} | |
} | |
} | |
class PerIndexTasklet implements Tasklet { | |
@Value("#{stepExecutionContext['partitionIndex']}") | |
int i | |
@Override | |
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
if (i == AppConfig.failOnItem) { | |
throw new RuntimeException("Failing on index $i") | |
} | |
println "Finish index $i" | |
} | |
} | |
class FindNumberOfJobsTasklet implements Tasklet { | |
@Override | |
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
chunkContext.stepContext.stepExecution. | |
executionContext.put(AppConfig.NUMBER_OF_ITEMS_KEY, 3) | |
} | |
} | |
def ctx = new AnnotationConfigApplicationContext(AppConfig) | |
def launcher = ctx.getBean(JobLauncher) | |
void showStatus(JobExecution jobExecution) { | |
println "Status is: ${jobExecution.status}" | |
println "Step executions: " | |
jobExecution.stepExecutions.each { StepExecution exec -> | |
println " ${exec.id}: ${exec.stepName} ${exec.status.toString() != 'COMPLETED' ? exec.status : ''}" | |
} | |
} | |
AppConfig.failOnItem = 2 | |
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:])) | |
showStatus(jobExecution) | |
AppConfig.failOnItem = null | |
jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:])) | |
showStatus(jobExecution) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.batch.core.* | |
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE') | |
import org.springframework.batch.core.configuration.annotation.* | |
import org.springframework.batch.core.job.builder.FlowBuilder | |
import org.springframework.batch.core.job.flow.Flow | |
import org.springframework.batch.core.launch.JobLauncher | |
import org.springframework.batch.core.listener.ExecutionContextPromotionListener | |
import org.springframework.batch.core.partition.support.Partitioner | |
import org.springframework.batch.core.scope.context.ChunkContext | |
import org.springframework.batch.core.scope.context.StepSynchronizationManager | |
import org.springframework.batch.core.step.tasklet.Tasklet | |
import org.springframework.batch.item.ExecutionContext | |
import org.springframework.batch.repeat.RepeatStatus | |
import org.springframework.beans.factory.annotation.Autowired | |
import org.springframework.beans.factory.annotation.Value | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
@Configuration | |
@EnableBatchProcessing | |
class AppConfig { | |
public final static String NUMBER_OF_ITEMS_KEY = 'NUMBER_OF_ITEMS' | |
public static Integer failOnItem = null | |
@Autowired | |
private JobBuilderFactory jobs | |
@Autowired | |
private StepBuilderFactory steps | |
@Bean | |
public Job job() { | |
Step partitionedStep = new RegisterStepScopeOnExecutionStep( | |
name: 'partitionedStep', | |
delegate: partitionedStep(null)) | |
Flow mainFlow = new FlowBuilder('mainFlow') | |
.start(step1()) | |
.next(partitionedStep) | |
.build() | |
jobs.get('myJob') | |
.start(mainFlow) | |
.build().build() | |
} | |
private Step step1() { | |
this.steps.get('step1') | |
.tasklet(new FindNumberOfJobsTasklet()) | |
.listener(new ExecutionContextPromotionListener(keys: [NUMBER_OF_ITEMS_KEY])) | |
.build() | |
} | |
@Bean | |
@JobScope | |
Step partitionedStep(@Value('#{jobExecutionContext["NUMBER_OF_ITEMS"]}') n) { | |
Partitioner partitioner = { int gridSize -> | |
(1..n).collectEntries { i -> | |
['partition_' + i, new ExecutionContext(partitionIndex: i)] | |
} | |
} as Partitioner | |
Step repeatedStep = new OnlyDelegateExecuteStep( | |
name: 'partitionedStep', | |
delegate: steps.get('partitionedStep').flow(repeatedStepFlow(null)).build()) | |
steps.get('partitionedStep') | |
.partitioner(repeatedStep) | |
.partitioner('partitionedStep', partitioner) | |
.gridSize(n) | |
.build() | |
} | |
@Bean | |
@StepScope | |
Flow repeatedStepFlow(@Value("#{stepExecutionContext['partitionIndex']}") Integer partitionIndex) { | |
new FlowBuilder('repeatedStepFlow') | |
.start(steps.get("repeatedStep-$partitionIndex").tasklet(perIndexTasklet()).build()) | |
.build() | |
} | |
@Bean | |
@StepScope | |
PerIndexTasklet perIndexTasklet() { | |
new PerIndexTasklet() | |
} | |
} | |
class RegisterStepScopeOnExecutionStep implements Step { | |
@Delegate | |
Step delegate | |
String name | |
// Spring-batch likes to call getName() when building the job | |
// Override this so that we avoid the scoped target bean from being | |
// instantiated before it should | |
String getName() { | |
name | |
} | |
// Unfortunately the task executors don't properly propagate the | |
// job or the step scope. Force propagation of step scope. | |
// See https://jira.spring.io/browse/BATCH-2269 | |
void execute(StepExecution stepExecution) throws JobInterruptedException { | |
try { | |
StepSynchronizationManager.register(stepExecution) | |
delegate.execute(stepExecution) | |
} finally { | |
StepSynchronizationManager.close() | |
} | |
} | |
} | |
class OnlyDelegateExecuteStep implements Step { | |
Step delegate | |
// In a partitioned context, we need to prevent calls to these | |
// three from instantiation the scoped bean | |
String name | |
boolean allowStartIfComplete = false | |
int startLimit = Integer.MAX_VALUE | |
@Override | |
void execute(StepExecution stepExecution) throws JobInterruptedException { | |
delegate.execute(stepExecution) | |
} | |
} | |
class PerIndexTasklet implements Tasklet { | |
@Value("#{stepExecutionContext['partitionIndex']}") | |
int i | |
@Override | |
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
if (i == AppConfig.failOnItem) { | |
throw new RuntimeException("Failing on index $i") | |
} | |
println "Finish index $i" | |
} | |
} | |
class FindNumberOfJobsTasklet implements Tasklet { | |
@Override | |
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
chunkContext.stepContext.stepExecution. | |
executionContext.put(AppConfig.NUMBER_OF_ITEMS_KEY, 3) | |
} | |
} | |
def ctx = new AnnotationConfigApplicationContext(AppConfig) | |
def launcher = ctx.getBean(JobLauncher) | |
void showStatus(JobExecution jobExecution) { | |
println "Status is: ${jobExecution.status}" | |
println "Step executions: " | |
jobExecution.stepExecutions.each { StepExecution exec -> | |
println " ${exec.id}: ${exec.stepName} ${exec.status.toString() != 'COMPLETED' ? exec.status : ''}" | |
} | |
} | |
AppConfig.failOnItem = 2 | |
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:])) | |
showStatus(jobExecution) | |
AppConfig.failOnItem = null | |
jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:])) | |
showStatus(jobExecution) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.springframework.batch.core.* | |
@Grab('org.springframework.batch:spring-batch-core:3.0.6.RELEASE') | |
import org.springframework.batch.core.Job | |
import org.springframework.batch.core.configuration.annotation.* | |
import org.springframework.batch.core.job.builder.FlowBuilder | |
import org.springframework.batch.core.job.flow.Flow | |
import org.springframework.batch.core.job.flow.FlowExecutionStatus | |
import org.springframework.batch.core.job.flow.JobExecutionDecider | |
import org.springframework.batch.core.launch.JobLauncher | |
import org.springframework.batch.core.listener.ExecutionContextPromotionListener | |
import org.springframework.batch.core.scope.context.ChunkContext | |
import org.springframework.batch.core.step.tasklet.Tasklet | |
import org.springframework.batch.repeat.RepeatStatus | |
import org.springframework.beans.factory.annotation.Autowired | |
import org.springframework.beans.factory.annotation.Value | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
@Configuration | |
@EnableBatchProcessing | |
class AppConfig { | |
public final static String NUMBER_OF_ITEMS_KEY = 'NUMBER_OF_ITEMS' | |
public static Integer failOnItem = null | |
@Autowired | |
private JobBuilderFactory jobs | |
@Autowired | |
private StepBuilderFactory steps | |
@Bean | |
public Job job() { | |
Step wrappingStep = new OverriddenNameStep( | |
name: 'wrappingStep', | |
delegate: steps.get('wrappingStep').flow(stepWrappingFlow()).allowStartIfComplete(true).build()) | |
jobs.get("myJob") | |
.start(step1()) | |
.next(decider()) | |
.on('CONTINUE') | |
.to(wrappingStep) | |
.next(decider()) | |
.on('CONTINUE') // we need to repeat these two lines | |
.to(wrappingStep) | |
.build() | |
.build() | |
} | |
private Step step1() { | |
this.steps.get('step1') | |
.tasklet(new FindNumberOfJobsTasklet()) | |
.listener(new ExecutionContextPromotionListener(keys: [NUMBER_OF_ITEMS_KEY])) | |
.build() | |
} | |
@Bean | |
@JobScope | |
Decider decider() { | |
new Decider() | |
} | |
@Bean | |
@StepScope | |
Flow stepWrappingFlow() { | |
new FlowBuilder('stepWrapperFlow') | |
.start(repeatedStep(null)) | |
.build() | |
} | |
@Bean | |
@StepScope | |
Step repeatedStep(@Value('#{decider.cur}') Integer currentIndex) { | |
steps.get("repeated-$currentIndex") | |
.tasklet(perIndexTasklet()) | |
.build() | |
} | |
@Bean | |
@StepScope | |
PerIndexTasklet perIndexTasklet() { | |
new PerIndexTasklet() | |
} | |
} | |
class OverriddenNameStep implements Step { | |
String name | |
@Delegate | |
Step delegate | |
} | |
class Decider implements JobExecutionDecider { // job scoped bean | |
@Value('#{jobExecutionContext["NUMBER_OF_ITEMS"]}') | |
Integer n | |
int cur = 0 | |
@Override | |
FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { | |
if (++cur > n) { | |
FlowExecutionStatus.COMPLETED | |
} else { | |
new FlowExecutionStatus('CONTINUE') | |
} | |
} | |
} | |
class PerIndexTasklet implements Tasklet { | |
@Value('#{decider.cur}') | |
int i | |
@Override | |
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
if (i == AppConfig.failOnItem) { | |
throw new RuntimeException("Failing on index $i") | |
} | |
println "Finish index $i" | |
} | |
} | |
class FindNumberOfJobsTasklet implements Tasklet { | |
@Override | |
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { | |
chunkContext.stepContext.stepExecution. | |
executionContext.put(AppConfig.NUMBER_OF_ITEMS_KEY, 3) | |
} | |
} | |
def ctx = new AnnotationConfigApplicationContext(AppConfig) | |
def launcher = ctx.getBean(JobLauncher) | |
void showStatus(JobExecution jobExecution) { | |
println "Status is: ${jobExecution.status}" | |
println "Step executions: " | |
jobExecution.stepExecutions.each { StepExecution exec -> | |
println " ${exec.id}: ${exec.stepName} ${exec.status.toString() != 'COMPLETED' ? exec.status : ''}" | |
} | |
} | |
AppConfig.failOnItem = 2 | |
def jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:])) | |
showStatus(jobExecution) | |
AppConfig.failOnItem = null | |
jobExecution = launcher.run(ctx.getBean(Job), new JobParameters([:])) | |
showStatus(jobExecution) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment