Skip to content

Instantly share code, notes, and snippets.

@Toparvion
Created March 17, 2017 17:51
Show Gist options
  • Save Toparvion/348c184ca7797d749ece33329da74e7c to your computer and use it in GitHub Desktop.
Save Toparvion/348c184ca7797d749ece33329da74e7c to your computer and use it in GitHub Desktop.
Code snippet for answer to http://stackoverflow.com/q/42562053/3507435
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.AggregatorSpec;
import org.springframework.integration.store.MessageGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import java.math.BigDecimal;
import java.util.Objects;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.stream.Collectors.joining;
import static org.springframework.integration.IntegrationMessageHeaderAccessor.CORRELATION_ID;
public class AggregatorConfigurer {
private static final Logger log = LoggerFactory.getLogger(AggregatorConfigurer.class);
private final MessageChannel preAggregatorQueueChannel;
public AggregatorConfigurer(MessageChannel preAggregatorQueueChannel) {
this.preAggregatorQueueChannel = preAggregatorQueueChannel;
}
public void configure(AggregatorSpec spec) {
spec.correlationStrategy(message -> BigDecimal.ONE)
.releaseStrategy(this::releaseStrategy)
.outputProcessor(this::outputProcessor)
.poller(p -> p.fixedDelay(50, MILLISECONDS))
.id("recordAggregator");
}
private boolean releaseStrategy(MessageGroup group) {
if (group.size() == 1) {
return false; // i.e. if group consists of just one element and thus is insufficient for making decision
}
Tuple2<Message<?>, Message<?>> prevAndLastMessages = findPrevAndLastMessages(group);
Message<?> lastMessage = prevAndLastMessages.getT1();
Message<?> second2LastMessage = prevAndLastMessages.getT2();
assert (lastMessage != null) && (second2LastMessage != null); // relying on check for singleton group
Long lastMessageCorrId = lastMessage.getHeaders().get(CORRELATION_ID, Long.class);
Long prevMessageCorrId = second2LastMessage.getHeaders().get(CORRELATION_ID, Long.class);
boolean isGroupComplete = !Objects.equals(lastMessageCorrId, prevMessageCorrId);
if (isGroupComplete) {
log.debug("Group {} is about to be released as last corrId {} differs from previous ones {}.",
group.getGroupId(), lastMessageCorrId, prevMessageCorrId);
}
return isGroupComplete;
}
private Object outputProcessor(MessageGroup group) {
assert group.size() > 0;
if (group.size() == 1) { // the singleton group is special case and must be handled separately
return MessageBuilder.fromMessage(group.getOne()).build();
}
Tuple2<Message<?>, Message<?>> prevAndLastMessages = findPrevAndLastMessages(group);
Message<?> lastMessage = prevAndLastMessages.getT1();
assert (lastMessage != null); // relying on check for singleton group
group.remove(lastMessage);
new MessagingTemplate(preAggregatorQueueChannel).send(lastMessage);
return composeRecord(group);
}
private Object composeRecord(MessageGroup group) {
return MessageBuilder
.withPayload(group.getMessages()
.stream()
.map(Message::getPayload)
.map(Object::toString)
.collect(joining("\n")))
.copyHeadersIfAbsent(group.getOne().getHeaders())
.build();
}
/**
* @return last (T1) and second to last (T2) messages from given group; both messages can be {@code null} if the
* group contains less than 2 or 1 elements
*/
private Tuple2<Message<?>, Message<?>> findPrevAndLastMessages(MessageGroup group) {
Message<?> lastMessage = null, second2LastMessage = null;
for (Message<?> message : group.getMessages()) {
second2LastMessage = lastMessage;
lastMessage = message;
}
return Tuples.of(lastMessage, second2LastMessage);
}
}
// A fragment of typical @Configuration class to show how to use the aggregator
MessageChannel preAggregatorQueueChannel = MessageChannels.queue(RECORD_AGGREGATOR_INPUT_CHANNEL).get();
RecordAggregatorConfigurer recordAggregatorConfigurer = new RecordAggregatorConfigurer(preAggregatorQueueChannel);
return IntegrationFlows
.from(...)
.channel(preAggregatorQueueChannel)
.aggregate(recordAggregatorConfigurer::configure)
.handle(...)
.get();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment