Skip to content

Instantly share code, notes, and snippets.

@fmbenhassine
Last active May 17, 2020 20:54
Show Gist options
  • Save fmbenhassine/bfe2be7386b99ce496425fac9ff35fb8 to your computer and use it in GitHub Desktop.
Save fmbenhassine/bfe2be7386b99ce496425fac9ff35fb8 to your computer and use it in GitHub Desktop.
Spring Batch example of how to use a ClassifierCompositeItemWriter #SpringBatch https://stackoverflow.com/questions/53377660
package org.springframework.batch.sample;
import java.util.Arrays;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemWriterBuilder;
import org.springframework.batch.item.file.transform.PassThroughLineAggregator;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
@Configuration
@EnableBatchProcessing
public class MyJob {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public MyJob(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Person> itemReader() {
Person foo1 = new Person();foo1.setId(1);foo1.setName("foo1");
Person foo2 = new Person();foo2.setId(2);foo2.setName("foo2");
Person bar1 = new Person();bar1.setId(3);bar1.setName("bar1");
Person bar2 = new Person();bar2.setId(4);bar2.setName("bar2");
return new ListItemReader<>(Arrays.asList(foo1, foo2, bar1, bar2));
}
@Bean
public ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter(ItemWriter<Person> fooItemWriter, ItemWriter<Person> barItemWriter) {
ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter = new ClassifierCompositeItemWriter<>();
classifierCompositeItemWriter.setClassifier((Classifier<Person, ItemWriter<? super Person>>) person -> {
if (person.getName().startsWith("foo")) {
return fooItemWriter;
} else {
return barItemWriter;
}
});
return classifierCompositeItemWriter;
}
@Bean
public FlatFileItemWriter<Person> fooItemWriter() {
return new FlatFileItemWriterBuilder<Person>()
.name("fooItemWriter")
.resource(new FileSystemResource("foos.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
@Bean
public FlatFileItemWriter<Person> barItemWriter() {
return new FlatFileItemWriterBuilder<Person>()
.name("barItemWriter")
.resource(new FileSystemResource("bars.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}
@Bean
public Step dataExtractionStep() {
return stepBuilderFactory.get("dataExtractionStep")
.<Person, Person>chunk(2)
.reader(itemReader())
.writer(classifierCompositeItemWriter(fooItemWriter(), barItemWriter()))
.stream(fooItemWriter())
.stream(barItemWriter())
.build();
}
@Bean
public Job dataExtractionJob() {
return jobBuilderFactory.get("dataExtractionJob")
.start(dataExtractionStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "Person{" +
"id=" + id +
", name='" + name + '\'' +
'}';
}
}
}
package io.github.benas.sbi;
import java.util.Arrays;
import javax.xml.bind.annotation.XmlRootElement;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.xml.StaxEventItemWriter;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
@Configuration
@EnableBatchProcessing
public class XmlJobConfig {
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
public XmlJobConfig(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
}
@Bean
public ItemReader<Person> itemReader() {
Person foo1 = new Person();foo1.setId(1);foo1.setName("foo1");
Person foo2 = new Person();foo2.setId(2);foo2.setName("foo2");
Person bar1 = new Person();bar1.setId(3);bar1.setName("bar1");
Person bar2 = new Person();bar2.setId(4);bar2.setName("bar2");
return new ListItemReader<>(Arrays.asList(foo1, foo2, bar1, bar2));
}
@Bean
public ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter(
ItemWriter<Person> fooItemWriter,
ItemWriter<Person> barItemWriter
) {
ClassifierCompositeItemWriter<Person> classifierCompositeItemWriter = new ClassifierCompositeItemWriter<>();
classifierCompositeItemWriter.setClassifier(new Classifier<Person, ItemWriter<? super Person>>() {
@Override
public ItemWriter<? super Person> classify(Person person) {
if (person.getName().startsWith("foo")) {
return fooItemWriter;
} else {
return barItemWriter;
}
}
});
return classifierCompositeItemWriter;
}
@Bean
public StaxEventItemWriter<Person> fooItemWriter() {
StaxEventItemWriter<Person> itemWriter = new StaxEventItemWriter<>();
itemWriter.setRootTagName("persons");
itemWriter.setResource(new FileSystemResource("foos.xml"));
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Person.class);
itemWriter.setMarshaller(marshaller);
return itemWriter;
}
@Bean
public StaxEventItemWriter<Person> barItemWriter() {
StaxEventItemWriter<Person> itemWriter = new StaxEventItemWriter<>();
itemWriter.setRootTagName("persons");
itemWriter.setResource(new FileSystemResource("bars.xml"));
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setClassesToBeBound(Person.class);
itemWriter.setMarshaller(marshaller);
return itemWriter;
}
@Bean
public Step dataExtractionStep() {
return stepBuilderFactory.get("dataExtractionStep")
.<Person, Person>chunk(2)
.reader(itemReader())
.writer(classifierCompositeItemWriter(fooItemWriter(), barItemWriter()))
.stream(fooItemWriter())
.stream(barItemWriter())
.build();
}
@Bean
public Job dataExtractionJob() {
return jobBuilderFactory.get("dataExtractionJob")
.start(dataExtractionStep())
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(XmlJobConfig.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
@XmlRootElement
public static class Person {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
@jovb
Copy link

jovb commented Mar 22, 2019

¡ great Job ! Thks for this code. Works very good for me.

@tommy3531
Copy link

Is it possible to have each itemwriter write to the same file

	public FlatFileItemWriter<Person> fooItemWriter() {
		return new FlatFileItemWriterBuilder<Person>()
				.name("fooItemWriter")
				.resource(new FileSystemResource("foos.txt"))
				.lineAggregator(new PassThroughLineAggregator<>())
				.build();
	}

	@Bean
	public FlatFileItemWriter<Person> barItemWriter() {
		return new FlatFileItemWriterBuilder<Person>()
				.name("barItemWriter")
				.resource(new FileSystemResource("foos.txt"))
				.lineAggregator(new PassThroughLineAggregator<>())
				.build();
	}
``

@fmbenhassine
Copy link
Author

@tommy3531

Is it possible to have each itemwriter write to the same file

That's not the goal in the first place when using a classifier composite item writer. The goal is to classify items and write them in different files. But nothing prevents you from doing it. However I'm not sure if it would work, try and see.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment