Skip to content

Instantly share code, notes, and snippets.

@kelapure
Created July 11, 2017 19:16
Show Gist options
  • Save kelapure/49221edcf9ebb4c09fdc7124621b6d71 to your computer and use it in GitHub Desktop.
Save kelapure/49221edcf9ebb4c09fdc7124621b6d71 to your computer and use it in GitHub Desktop.
Mystery of The alternate messages
package com.lmig.pli.rate.auto.autorating.event;
import com.lmig.pli.rate.auto.autorating.config.EventAutoConfig;
import com.lmig.pli.rate.auto.autorating.service.MessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.util.Pair;
import org.springframework.integration.annotation.Aggregator;
import org.springframework.integration.annotation.CorrelationStrategy;
import org.springframework.integration.annotation.ReleaseStrategy;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class EventHandlerImpl implements EventHandler {
private final MessageService messageService;
private final EventAutoConfig.MessageSync messageSync;
@Autowired
public EventHandlerImpl(final MessageService messageService, final EventAutoConfig.MessageSync messageSync) {
this.messageService = messageService;
this.messageSync = messageSync;
}
/* EVENT BRIDGING: The following @ServiceActivator methods move events from multiple channels to a single channel. */
/*
* Provide a @ServiceActivator for each input channel that your Aggregator requires an event to be read.
* Each ServiceActivator must bridge that event to the same common aggregation channel, as the Aggregator
* can only read from a single channel to perform its correlation.
*/
@ServiceActivator(inputChannel = EventChannels.RATEABLE_QUOTE_IN, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, autoStartup = "true")
@Override
public Message<?> rateableQuoteBridge(Message<?> message) {
return channelBridge(message, EventChannels.RATEABLE_QUOTE_IN);
}
@ServiceActivator(inputChannel = EventChannels.RISK_ACCEPTED_IN, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, autoStartup = "true")
@Override
public Message<?> riskAcceptedBridge(Message<?> message) {
return channelBridge(message, EventChannels.RISK_ACCEPTED_IN);
}
@ServiceActivator(inputChannel = EventChannels.RATE_PROVISIONED_IN, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, autoStartup = "true")
@Override
public Message<?> rateProvisionedBridge(Message<?> message) {
return channelBridge(message, EventChannels.RATE_PROVISIONED_IN);
}
private Message<?> channelBridge(Message<?> message, String channelName) {
log.debug("Received event from {} channel with processId {}, placing it on the aggregator channel.", channelName, String.valueOf(message.getHeaders().get(EventHandler.PROCESS_ID)));
// Update each incoming bridged message with an additional header that can be used to determine event type during aggregation.
return MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders()).setHeader(EVENT_TYPE_HEADER, channelName).build();
}
/* EVENT AGGREGATION: The following @CorrelationStrategy, @ReleaseStrategy, and @Aggregator combine to read
* multiple events from a single channel and perform business logic to publish a resulting output event once a set of criteria are met.
*/
/* This identifies a message's correlation key. In this implementation, we're using a message header `processId`,
* but this could perform event deserialization to pull data from the message body if more complex correlation is needed.
*/
@CorrelationStrategy
@Override
public Object correlateBy(Message message) {
log.debug("Correlating message with id: {}", message.getHeaders().get(EventHandler.PROCESS_ID));
// extract 'processId' message header
return message.getHeaders().get(EventHandler.PROCESS_ID);
}
/**
* This checks to see when a collection of messages with the same
* correlation key is ready for aggregation logic. In this implementation,
* we check that one message from each input channel has been received.
*/
@ReleaseStrategy
@Override
public boolean isComplete(List<Message<?>> messages) {
boolean unrateableQuote = false;
boolean riskAccepted = false;
boolean rateProvisioned = false;
try {
for (Message<?> message : messages) {
MessageHeaders headers = message.getHeaders();
String eventType = headers.get(EVENT_TYPE_HEADER).toString();
if (eventType.equalsIgnoreCase(EventChannels.RATEABLE_QUOTE_IN)) {
RateableQuoteEvent event = messageService.parse(message, RateableQuoteEvent.class);
if (event.getType().equals(RateableQuoteEventType.UNRATEABLE_QUOTE)) {
unrateableQuote = true;
}
} else if (eventType.equalsIgnoreCase(EventChannels.RISK_ACCEPTED_IN)) {
RiskAcceptedEvent event = messageService.parse(message, RiskAcceptedEvent.class);
if (event.getType().equals(RiskAcceptedEventType.UNDERWRITING_RISK_ACCEPTED)) {
riskAccepted = true;
}
} else if (eventType.equalsIgnoreCase(EventChannels.RATE_PROVISIONED_IN)) {
RateProvisionedEvent event = messageService.parse(message, RateProvisionedEvent.class);
if (event.getType().equals(RateProvisionedEventType.RATE_PROVISIONED)) {
rateProvisioned = true;
}
}
}
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return unrateableQuote || (riskAccepted && rateProvisioned);
}
/**
* This method takes a completed message group and calls the underlying domain service
* to perform required business logic to produce the output event.
*/
@Aggregator(inputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT)
@Override
public Message<?> aggregate(List<Message<?>> messages) {
try {
RateableQuoteEvent unrateableQuoteEvent = null;
RiskAcceptedEvent riskAcceptedEvent = null;
RateProvisionedEvent rateProvisionedEvent = null;
// Pull out the event of each type for use in calling the domain service.
// In the case of duplicate messages, this will take the last instance of a message from a given channel.
for (Message<?> message : messages) {
String eventType = message.getHeaders().get(EVENT_TYPE_HEADER).toString();
if (eventType.equalsIgnoreCase(EventChannels.RATEABLE_QUOTE_IN)) {
RateableQuoteEvent event = messageService.parse(message, RateableQuoteEvent.class);
if (event.getType().equals(RateableQuoteEventType.UNRATEABLE_QUOTE)) {
unrateableQuoteEvent = event;
}
} else if (eventType.equalsIgnoreCase(EventChannels.RISK_ACCEPTED_IN)) {
RiskAcceptedEvent event = messageService.parse(message, RiskAcceptedEvent.class);
if (event.getType().equals(RiskAcceptedEventType.UNDERWRITING_RISK_ACCEPTED)) {
riskAcceptedEvent = event;
}
} else if (eventType.equalsIgnoreCase(EventChannels.RATE_PROVISIONED_IN)) {
RateProvisionedEvent event = messageService.parse(message, RateProvisionedEvent.class);
if (event.getType().equals(RateProvisionedEventType.RATE_PROVISIONED)) {
rateProvisionedEvent = event;
}
}
}
//get first event and extract processId since events are correlated already
String processId = String.valueOf(messages.get(0).getHeaders().get(EventHandler.PROCESS_ID));
if (unrateableQuoteEvent != null) {
messageSync.put(processId, unrateableQuoteEvent);
} else {
messageSync.put(processId, Pair.of(riskAcceptedEvent, rateProvisionedEvent));
}
return MessageBuilder.withPayload("OK")
.setHeader(EventHandler.PROCESS_ID, "SUCCESS")
.setHeader(EventHandler.EVENT_TYPE_HEADER, "successChannel") //this causes the aggregator methods to ignore this message during grouping
.build();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment