Skip to content

Instantly share code, notes, and snippets.

@fmbenhassine
Last active May 24, 2018 20:37
Show Gist options
  • Save fmbenhassine/98a0b48b563d190814acbaa20cad662c to your computer and use it in GitHub Desktop.
Save fmbenhassine/98a0b48b563d190814acbaa20cad662c to your computer and use it in GitHub Desktop.
Spring Batch "remote" chunking sample with embedded JMS broker #SpringBatch
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.sample;
import java.util.Arrays;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.item.ChunkProcessor;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.sample.config.JobRunnerConfiguration;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* The master step of the tested job will read data and send chunks to the worker
* (started in {@link RemoteChunkingJobFunctionalTests#setUp()}) for processing and writing.
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {
JobRunnerConfiguration.class,
RemoteChunkingJobFunctionalTests.MasterConfiguration.class})
public class RemoteChunkingJobFunctionalTests {
private static final String BROKER_URL = "tcp://localhost:61616";
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
private BrokerService brokerService;
private AnnotationConfigApplicationContext workerApplicationContext;
@Before
public void setUp() throws Exception {
this.brokerService = new BrokerService();
this.brokerService.addConnector(BROKER_URL);
this.brokerService.start();
this.workerApplicationContext = new AnnotationConfigApplicationContext(WorkerConfiguration.class);
}
@After
public void tearDown() throws Exception {
this.workerApplicationContext.close();
this.brokerService.stop();
}
@Test
public void testLaunchJob() throws Exception {
// when
JobExecution jobExecution = this.jobLauncherTestUtils.launchJob();
// then
Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
Assert.assertEquals(
"Waited for 2 results.", // the master sent 2 chunks ({1, 2, 3} and {4, 5, 6}) to workers
jobExecution.getExitStatus().getExitDescription());
}
@Configuration
@EnableBatchProcessing
@EnableIntegration
public static class MasterConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory jmsConnectionFactory, DirectChannel requests) {
return IntegrationFlows
.from(requests)
.handle(Jms.outboundAdapter(jmsConnectionFactory).destination("requests"))
.get();
}
@Bean
public QueueChannel replies() {
return new QueueChannel();
}
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory jmsConnectionFactory, PollableChannel replies) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("replies"))
.channel(replies)
.get();
}
@Bean
public MessagingTemplate messagingTemplate() {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
return messagingTemplate;
}
@Bean
public ItemWriter<Integer> itemWriter() {
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
}
@Bean
public RemoteChunkHandlerFactoryBean<Integer> chunkHandler() {
RemoteChunkHandlerFactoryBean<Integer> remoteChunkHandlerFactoryBean = new RemoteChunkHandlerFactoryBean<>();
remoteChunkHandlerFactoryBean.setChunkWriter(itemWriter());
remoteChunkHandlerFactoryBean.setStep(masterStep());
return remoteChunkHandlerFactoryBean;
}
@Bean
public TaskletStep masterStep() {
return this.stepBuilderFactory.get("masterStep")
.<Integer, Integer>chunk(3)
.reader(itemReader())
.writer(itemWriter())
.build();
}
@Bean
public ItemReader<Integer> itemReader() {
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6));
}
@Bean
public Job remoteChunkingJob() {
return this.jobBuilderFactory.get("remoteChunkingJob")
.start(masterStep())
.build();
}
}
@Configuration
@EnableBatchProcessing
@EnableIntegration
public static class WorkerConfiguration {
@Bean
public ActiveMQConnectionFactory jmsConnectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(BROKER_URL);
connectionFactory.setTrustAllPackages(true);
return connectionFactory;
}
@Bean
public DirectChannel requests() {
return new DirectChannel();
}
@Bean
public DirectChannel replies() {
return new DirectChannel();
}
@Bean
public IntegrationFlow incomingRequests(ActiveMQConnectionFactory jmsConnectionFactory) {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("requests"))
.channel(requests())
.get();
}
@Bean
public IntegrationFlow outgoingReplies(ActiveMQConnectionFactory jmsConnectionFactory) {
return IntegrationFlows
.from(replies())
.handle(Jms.outboundAdapter(jmsConnectionFactory).destination("replies"))
.get();
}
@Bean
public ChunkProcessor<Integer> chunkProcessor() {
return new SimpleChunkProcessor<>(itemProcessor(), itemWriter());
}
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() {
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor());
return chunkProcessorChunkHandler;
}
@Bean
public ItemWriter<Integer> itemWriter() {
return items -> {
for (Integer item : items) {
System.out.println("writing item " + item);
}
};
}
@Bean
public ItemProcessor<Integer, Integer> itemProcessor() {
return item -> {
System.out.println("processing item " + item);
return item;
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment