Skip to content

Instantly share code, notes, and snippets.

@fmbenhassine
Created November 2, 2020 12:22
Show Gist options
  • Save fmbenhassine/d34da4dda074c81a0cd4a1d2bcdfca38 to your computer and use it in GitHub Desktop.
Save fmbenhassine/d34da4dda074c81a0cd4a1d2bcdfca38 to your computer and use it in GitHub Desktop.
#SpringBatch MultiResourcePartitioner job sample
package org.springframework.batch.sample;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.partition.support.MultiResourcePartitioner;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.PassThroughLineMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
@Configuration
@EnableBatchProcessing
public class MultiResourcePartitionerSample {
private final JobBuilderFactory jobs;
private final StepBuilderFactory steps;
public MultiResourcePartitionerSample(JobBuilderFactory jobs, StepBuilderFactory steps) {
this.jobs = jobs;
this.steps = steps;
}
@Bean
public Step managerStep() {
return steps.get("managerStep")
.partitioner(workerStep().getName(), partitioner(null))
.step(workerStep())
.taskExecutor(new SimpleAsyncTaskExecutor())
.build();
}
@StepScope
@Bean
public MultiResourcePartitioner partitioner(@Value("#{jobParameters['inputFiles']}") Resource[] resources) {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
partitioner.setResources(resources);
return partitioner;
}
@StepScope
@Bean
public FlatFileItemReader<String> personFileReader(@Value("#{stepExecutionContext['fileName']}") Resource resource) {
return new FlatFileItemReaderBuilder<String>()
.name("itemReader")
.resource(resource)
.lineMapper(new PassThroughLineMapper())
.build();
}
@Bean
public Step workerStep() {
return steps.get("workerStep")
.<String, String>chunk(5)
.reader(personFileReader(null))
.writer(items -> items.forEach(System.out::println))
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(managerStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MultiResourcePartitionerSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
JobParameters jobParameters = new JobParametersBuilder()
.addString("inputFiles", "file:///tmp/data/*.csv")
.toJobParameters();
jobLauncher.run(job, jobParameters);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment