Skip to content

Instantly share code, notes, and snippets.

@fmbenhassine
Last active February 9, 2025 09:05
Show Gist options
  • Save fmbenhassine/e2867f483ddd96517e421e82ea2c9481 to your computer and use it in GitHub Desktop.
Save fmbenhassine/e2867f483ddd96517e421e82ea2c9481 to your computer and use it in GitHub Desktop.
#SpringBatch aggregation job sample
package com.example.demo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.*;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.support.JdbcTransactionManager;
import java.util.function.Consumer;
import java.util.stream.Stream;
@Configuration
public class AggregationJobConfiguration {
record Metric(int value, long timestamp) { };
// items
Stream<Metric> items = Stream.of(
new Metric(1, 1000),
new Metric(2, 2000),
new Metric(3, 3000),
new Metric(4, 4000),
new Metric(5, 5000),
new Metric(6, 6000)
);
@Bean
public Job job(JobRepository jobRepository, Step step) {
return new JobBuilder("job", jobRepository).start(step).build();
}
@Bean
public Step step(JobRepository jobRepository, JdbcTransactionManager transactionManager) {
// item reader
IteratorItemReader<Metric> reader = new IteratorItemReader<>(items.iterator());
// item processor (filter items as needed, for example select the time window and/or the range of values)
ItemProcessor<Metric, Metric> processor =
item -> (item.timestamp() > 2000 && item.timestamp() < 6000) && item.value() > 3 ? item : null;
// item writer
ItemWriter<Metric> writer = new ItemStreamWriter<>() {
private ExecutionContext executionContext;
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
this.executionContext = executionContext;
}
@Override
public void write(Chunk<? extends Metric> chunk) {
chunk.forEach((Consumer<Metric>) metric -> {
// do the calculation and save it in the step's execution context for later use outside the job
int currentSum = executionContext.getInt("sum", 0);
executionContext.put("sum", currentSum + metric.value());
});
}
};
return new StepBuilder("step", jobRepository)
.<Metric, Metric>chunk(2, transactionManager)
.reader(reader)
.processor(processor)
.writer(writer)
.build();
}
}
package com.example.demo;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
// run the job and get the execution
ConfigurableApplicationContext applicationContext = SpringApplication.run(DemoApplication.class, args);
JobExplorer jobExplorer = applicationContext.getBean(JobExplorer.class);
JobExecution jobExecution = jobExplorer.getJobExecution(1L);
// get the execution context of the step
ExecutionContext executionContext = jobExecution.getStepExecutions().iterator().next().getExecutionContext();
// get the calculation from the context
int sum = executionContext.getInt("sum");
System.out.println("sum = " + sum);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.4.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<url/>
<licenses>
<license/>
</licenses>
<developers>
<developer/>
</developers>
<scm>
<connection/>
<developerConnection/>
<tag/>
<url/>
</scm>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@fmbenhassine
Copy link
Author

prints:

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/

 :: Spring Boot ::                (v3.4.2)

2025-02-09T09:43:59.668+01:00  INFO 806 --- [demo] [           main] com.example.demo.DemoApplication         : Starting DemoApplication using Java 21.0.2 with PID 806 (/Users/mbh-broadcom/Downloads/demo-batch-aggregation/target/classes started by mbh-broadcom in /Users/mbh-broadcom/Downloads/demo-batch-aggregation)
2025-02-09T09:43:59.670+01:00  INFO 806 --- [demo] [           main] com.example.demo.DemoApplication         : No active profile set, falling back to 1 default profile: "default"
2025-02-09T09:44:00.142+01:00  INFO 806 --- [demo] [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2025-02-09T09:44:00.263+01:00  INFO 806 --- [demo] [           main] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Added connection conn0: url=jdbc:h2:mem:434af61e-b065-457a-ad77-979f0451f347 user=SA
2025-02-09T09:44:00.264+01:00  INFO 806 --- [demo] [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2025-02-09T09:44:00.475+01:00  INFO 806 --- [demo] [           main] com.example.demo.DemoApplication         : Started DemoApplication in 1.103 seconds (process running for 1.614)
2025-02-09T09:44:00.477+01:00  INFO 806 --- [demo] [           main] o.s.b.a.b.JobLauncherApplicationRunner   : Running default command line with: []
2025-02-09T09:44:00.505+01:00  INFO 806 --- [demo] [           main] o.s.b.c.l.s.TaskExecutorJobLauncher      : Job: [SimpleJob: [name=job]] launched with the following parameters: [{}]
2025-02-09T09:44:00.521+01:00  INFO 806 --- [demo] [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step]
2025-02-09T09:44:00.536+01:00  INFO 806 --- [demo] [           main] o.s.batch.core.step.AbstractStep         : Step: [step] executed in 15ms
2025-02-09T09:44:00.541+01:00  INFO 806 --- [demo] [           main] o.s.b.c.l.s.TaskExecutorJobLauncher      : Job: [SimpleJob: [name=job]] completed with the following parameters: [{}] and the following status: [COMPLETED] in 29ms
sum = 9
2025-02-09T09:44:00.552+01:00  INFO 806 --- [demo] [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2025-02-09T09:44:00.569+01:00  INFO 806 --- [demo] [ionShutdownHook] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment