Skip to content

Instantly share code, notes, and snippets.

@artembilan
Created January 12, 2015 11:27
Show Gist options
  • Save artembilan/9ffba1bb0b18842e27cf to your computer and use it in GitHub Desktop.
Save artembilan/9ffba1bb0b18842e27cf to your computer and use it in GitHub Desktop.
@Configuration
@EnableIntegration
public class IntegrationConfig {
...
@Bean
public IntegrationFlow reqFlow() {
return IntegrationFlows
.from("request.ch")
.enrichHeaders(e -> {
e.headerChannelsToString();
})
.enrichHeaders(e -> {
e.messageProcessor(m -> {
Map<String, Object> map = new HashMap<String, Object>(1);
map.put(JmsHeaders.CORRELATION_ID, m.getHeaders().get(MessageHeaders.REPLY_CHANNEL));
return map;
} );
})
.routeToRecipients(r -> {
r.ignoreSendFailures(false);
r.recipient("jms.req.ch", "true");
r.recipient("jms.agg.ch", "true");
})
.get();
}
@Bean
public IntegrationFlow jmsReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.enrichHeaders( e -> { e.headerChannelsToString(); })
.handle(Jms.outboundAdapter(cachingConnectionFactory)
.destination("TEST_REQUEST_CH")).get();
}
@Bean
public IntegrationFlow jmsPostReqFlow() {
return IntegrationFlows
.from("jms.req.ch")
.handle("postSendService", "postSendProcess")
.get();
}
@Bean
public IntegrationFlow jmsResFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory).destination(
"TEST_RESPONSE_CH"),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(10)))
.channel("jms.agg.ch").get();
}
@Bean
public IntegrationFlow jmsAggFlow() {
return IntegrationFlows
.from("jms.agg.ch")
.aggregate(a -> {
a.outputProcessor(g -> {
List<Message<?>> l = new ArrayList<Message<?>>(g.getMessages());
Message<?> firstMessage = l.get(0);
Message<?> lastMessage = (l.size() > 1) ? l.get(l.size() - 1) : firstMessage;
Message<?> messageOut = MessageBuilder.fromMessage(lastMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, (String) firstMessage.getHeaders().getReplyChannel())
.build();
return messageOut;
});
a.correlationStrategy( m -> {
return m.getHeaders().get(JmsHeaders.CORRELATION_ID);
});
a.releaseStrategy(g -> {
return g.size() == 2;
});
a.groupTimeout(45000);
a.sendPartialResultOnExpiry(false);
a.discardChannel("jms.agg.timeout.ch");
}, null)
.channel("response.ch")
.get();
}
}
@Bean
public IntegrationFlow jmsAggTimeoutFlow() {
return IntegrationFlows
.from("jms.agg.timeout.ch")
.handle(Message.class, (m, n) -> {
MessageTimeoutException mExc = new MessageTimeoutException(m);
Message<?> mOut = MessageBuilder.withPayload(mExc)
.copyHeaders(m.getHeaders())
.build();
return mOut;
})
.channel("error.ch")
.get();
}
}
@artembilan
Copy link
Author

https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L18: no reason to use here messageProcessor. The CORRELATION_ID can be populated as normal .header(). At the same time you can just use IntegrationMessageHeaderAccessor.CORRELATION_ID and don't pollute JMS header. Using standard IntegrationMessageHeaderAccessor.CORRELATION_ID there was no reason to have .correlationStrategy() on the aggregator.
https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L35: this is redundant.
From other side, if you don't use Gateway, there is no reason to .headerChannelsToString(), because there is no TemporaryReplyChannel.
https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L80: it can be just simple Lambda expression:a.releaseStrategy(g -> g.size() == 2);
https://gist.github.com/artembilan/9ffba1bb0b18842e27cf#file-so-27405495-java-L98: would be better if you use here ErrorMessage:

 .handle(Message.class, (m, h) -> new ErrorMessage(new MessageTimeoutException(m), h)) 

That's all.

Have a good time!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment