Created
January 12, 2015 11:27
-
-
Save artembilan/9ffba1bb0b18842e27cf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@EnableIntegration | |
public class IntegrationConfig { | |
... | |
@Bean | |
public IntegrationFlow reqFlow() { | |
return IntegrationFlows | |
.from("request.ch") | |
.enrichHeaders(e -> { | |
e.headerChannelsToString(); | |
}) | |
.enrichHeaders(e -> { | |
e.messageProcessor(m -> { | |
Map<String, Object> map = new HashMap<String, Object>(1); | |
map.put(JmsHeaders.CORRELATION_ID, m.getHeaders().get(MessageHeaders.REPLY_CHANNEL)); | |
return map; | |
} ); | |
}) | |
.routeToRecipients(r -> { | |
r.ignoreSendFailures(false); | |
r.recipient("jms.req.ch", "true"); | |
r.recipient("jms.agg.ch", "true"); | |
}) | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow jmsReqFlow() { | |
return IntegrationFlows | |
.from("jms.req.ch") | |
.enrichHeaders( e -> { e.headerChannelsToString(); }) | |
.handle(Jms.outboundAdapter(cachingConnectionFactory) | |
.destination("TEST_REQUEST_CH")).get(); | |
} | |
@Bean | |
public IntegrationFlow jmsPostReqFlow() { | |
return IntegrationFlows | |
.from("jms.req.ch") | |
.handle("postSendService", "postSendProcess") | |
.get(); | |
} | |
@Bean | |
public IntegrationFlow jmsResFlow() { | |
return IntegrationFlows | |
.from(Jms.inboundAdapter(cachingConnectionFactory).destination( | |
"TEST_RESPONSE_CH"), | |
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10))) | |
.channel("jms.agg.ch").get(); | |
} | |
@Bean | |
public IntegrationFlow jmsAggFlow() { | |
return IntegrationFlows | |
.from("jms.agg.ch") | |
.aggregate(a -> { | |
a.outputProcessor(g -> { | |
List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages()); | |
Message<?> firstMessage = l.get(0); | |
Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage; | |
Message<?> messageOut = MessageBuilder.fromMessage(lastMessage) | |
.setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel()) | |
.build(); | |
return messageOut; | |
}); | |
a.correlationStrategy( m -> { | |
return m.getHeaders().get(JmsHeaders.CORRELATION_ID); | |
}); | |
a.releaseStrategy(g -> { | |
return g.size() == 2; | |
}); | |
a.groupTimeout(45000); | |
a.sendPartialResultOnExpiry(false); | |
a.discardChannel("jms.agg.timeout.ch"); | |
}, null) | |
.channel("response.ch") | |
.get(); | |
} | |
} | |
@Bean | |
public IntegrationFlow jmsAggTimeoutFlow() { | |
return IntegrationFlows | |
.from("jms.agg.timeout.ch") | |
.handle(Message.class, (m, n) -> { | |
MessageTimeoutException mExc = new MessageTimeoutException(m); | |
Message<?> mOut = MessageBuilder.withPayload(mExc) | |
.copyHeaders(m.getHeaders()) | |
.build(); | |
return mOut; | |
}) | |
.channel("error.ch") | |
.get(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L18: no reason to use here
messageProcessor
. TheCORRELATION_ID
can be populated as normal.header()
. At the same time you can just useIntegrationMessageHeaderAccessor.CORRELATION_ID
and don't pollute JMS header. Using standardIntegrationMessageHeaderAccessor.CORRELATION_ID
there was no reason to have.correlationStrategy()
on the aggregator.https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L35: this is redundant.
From other side, if you don't use
Gateway
, there is no reason to.headerChannelsToString()
, because there is noTemporaryReplyChannel
.https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L80: it can be just simple Lambda expression:
a.releaseStrategy(g -> g.size() == 2);
https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L98: would be better if you use here
ErrorMessage
:That's all.
Have a good time!