Last active
May 24, 2018 20:37
-
-
Save fmbenhassine/98a0b48b563d190814acbaa20cad662c to your computer and use it in GitHub Desktop.
Spring Batch "remote" chunking sample with embedded JMS broker #SpringBatch
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2018 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.batch.sample; | |
import java.util.Arrays; | |
import org.apache.activemq.ActiveMQConnectionFactory; | |
import org.apache.activemq.broker.BrokerService; | |
import org.junit.After; | |
import org.junit.Assert; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.batch.core.ExitStatus; | |
import org.springframework.batch.core.Job; | |
import org.springframework.batch.core.JobExecution; | |
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; | |
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; | |
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; | |
import org.springframework.batch.core.step.item.ChunkProcessor; | |
import org.springframework.batch.core.step.item.SimpleChunkProcessor; | |
import org.springframework.batch.core.step.tasklet.TaskletStep; | |
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter; | |
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler; | |
import org.springframework.batch.integration.chunk.RemoteChunkHandlerFactoryBean; | |
import org.springframework.batch.item.ItemProcessor; | |
import org.springframework.batch.item.ItemReader; | |
import org.springframework.batch.item.ItemWriter; | |
import org.springframework.batch.item.support.ListItemReader; | |
import org.springframework.batch.sample.config.JobRunnerConfiguration; | |
import org.springframework.batch.test.JobLauncherTestUtils; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.annotation.AnnotationConfigApplicationContext; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.integration.annotation.ServiceActivator; | |
import org.springframework.integration.channel.DirectChannel; | |
import org.springframework.integration.channel.QueueChannel; | |
import org.springframework.integration.config.EnableIntegration; | |
import org.springframework.integration.core.MessagingTemplate; | |
import org.springframework.integration.dsl.IntegrationFlow; | |
import org.springframework.integration.dsl.IntegrationFlows; | |
import org.springframework.integration.jms.dsl.Jms; | |
import org.springframework.messaging.PollableChannel; | |
import org.springframework.test.context.ContextConfiguration; | |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; | |
/** | |
* The master step of the tested job will read data and send chunks to the worker | |
* (started in {@link RemoteChunkingJobFunctionalTests#setUp()}) for processing and writing. | |
*/ | |
@RunWith(SpringJUnit4ClassRunner.class) | |
@ContextConfiguration(classes = { | |
JobRunnerConfiguration.class, | |
RemoteChunkingJobFunctionalTests.MasterConfiguration.class}) | |
public class RemoteChunkingJobFunctionalTests { | |
private static final String BROKER_URL = "tcp://localhost:61616"; | |
@Autowired | |
private JobLauncherTestUtils jobLauncherTestUtils; | |
private BrokerService brokerService; | |
private AnnotationConfigApplicationContext workerApplicationContext; | |
@Before | |
public void setUp() throws Exception { | |
this.brokerService = new BrokerService(); | |
this.brokerService.addConnector(BROKER_URL); | |
this.brokerService.start(); | |
this.workerApplicationContext = new AnnotationConfigApplicationContext(WorkerConfiguration.class); | |
} | |
@After | |
public void tearDown() throws Exception { | |
this.workerApplicationContext.close(); | |
this.brokerService.stop(); | |
} | |
@Test | |
public void testLaunchJob() throws Exception { | |
// when | |
JobExecution jobExecution = this.jobLauncherTestUtils.launchJob(); | |
// then | |
Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode()); | |
Assert.assertEquals( | |
"Waited for 2 results.", // the master sent 2 chunks ({1, 2, 3} and {4, 5, 6}) to workers | |
jobExecution.getExitStatus().getExitDescription()); | |
} | |
@Configuration | |
@EnableBatchProcessing | |
@EnableIntegration | |
public static class MasterConfiguration { | |
@Autowired | |
private JobBuilderFactory jobBuilderFactory; | |
@Autowired | |
private StepBuilderFactory stepBuilderFactory; | |
@Bean | |
public ActiveMQConnectionFactory jmsConnectionFactory() { | |
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); | |
connectionFactory.setBrokerURL(BROKER_URL); | |
connectionFactory.setTrustAllPackages(true); | |
return connectionFactory; | |
} | |
@Bean | |
public DirectChannel requests() { | |
return new DirectChannel(); | |
} | |
@Bean | |
public IntegrationFlow outboundFlow(ActiveMQConnectionFactory jmsConnectionFactory, DirectChannel requests) { | |
return IntegrationFlows | |
.from(requests) | |
.handle(Jms.outboundAdapter(jmsConnectionFactory).destination("requests")) | |
.get(); | |
} | |
@Bean | |
public QueueChannel replies() { | |
return new QueueChannel(); | |
} | |
@Bean | |
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory jmsConnectionFactory, PollableChannel replies) { | |
return IntegrationFlows | |
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("replies")) | |
.channel(replies) | |
.get(); | |
} | |
@Bean | |
public MessagingTemplate messagingTemplate() { | |
MessagingTemplate messagingTemplate = new MessagingTemplate(); | |
messagingTemplate.setDefaultChannel(requests()); | |
return messagingTemplate; | |
} | |
@Bean | |
public ItemWriter<Integer> itemWriter() { | |
ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); | |
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate()); | |
chunkMessageChannelItemWriter.setReplyChannel(replies()); | |
return chunkMessageChannelItemWriter; | |
} | |
@Bean | |
public RemoteChunkHandlerFactoryBean<Integer> chunkHandler() { | |
RemoteChunkHandlerFactoryBean<Integer> remoteChunkHandlerFactoryBean = new RemoteChunkHandlerFactoryBean<>(); | |
remoteChunkHandlerFactoryBean.setChunkWriter(itemWriter()); | |
remoteChunkHandlerFactoryBean.setStep(masterStep()); | |
return remoteChunkHandlerFactoryBean; | |
} | |
@Bean | |
public TaskletStep masterStep() { | |
return this.stepBuilderFactory.get("masterStep") | |
.<Integer, Integer>chunk(3) | |
.reader(itemReader()) | |
.writer(itemWriter()) | |
.build(); | |
} | |
@Bean | |
public ItemReader<Integer> itemReader() { | |
return new ListItemReader<>(Arrays.asList(1, 2, 3, 4, 5, 6)); | |
} | |
@Bean | |
public Job remoteChunkingJob() { | |
return this.jobBuilderFactory.get("remoteChunkingJob") | |
.start(masterStep()) | |
.build(); | |
} | |
} | |
@Configuration | |
@EnableBatchProcessing | |
@EnableIntegration | |
public static class WorkerConfiguration { | |
@Bean | |
public ActiveMQConnectionFactory jmsConnectionFactory() { | |
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); | |
connectionFactory.setBrokerURL(BROKER_URL); | |
connectionFactory.setTrustAllPackages(true); | |
return connectionFactory; | |
} | |
@Bean | |
public DirectChannel requests() { | |
return new DirectChannel(); | |
} | |
@Bean | |
public DirectChannel replies() { | |
return new DirectChannel(); | |
} | |
@Bean | |
public IntegrationFlow incomingRequests(ActiveMQConnectionFactory jmsConnectionFactory) { | |
return IntegrationFlows | |
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory).destination("requests")) | |
.channel(requests()) | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow outgoingReplies(ActiveMQConnectionFactory jmsConnectionFactory) { | |
return IntegrationFlows | |
.from(replies()) | |
.handle(Jms.outboundAdapter(jmsConnectionFactory).destination("replies")) | |
.get(); | |
} | |
@Bean | |
public ChunkProcessor<Integer> chunkProcessor() { | |
return new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); | |
} | |
@Bean | |
@ServiceActivator(inputChannel = "requests", outputChannel = "replies") | |
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler() { | |
ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); | |
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor()); | |
return chunkProcessorChunkHandler; | |
} | |
@Bean | |
public ItemWriter<Integer> itemWriter() { | |
return items -> { | |
for (Integer item : items) { | |
System.out.println("writing item " + item); | |
} | |
}; | |
} | |
@Bean | |
public ItemProcessor<Integer, Integer> itemProcessor() { | |
return item -> { | |
System.out.println("processing item " + item); | |
return item; | |
}; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment