Created
July 11, 2017 19:16
-
-
Save kelapure/49221edcf9ebb4c09fdc7124621b6d71 to your computer and use it in GitHub Desktop.
Mystery of The alternate messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lmig.pli.rate.auto.autorating.event; | |
import com.lmig.pli.rate.auto.autorating.config.EventAutoConfig; | |
import com.lmig.pli.rate.auto.autorating.service.MessageService; | |
import lombok.extern.slf4j.Slf4j; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.data.util.Pair; | |
import org.springframework.integration.annotation.Aggregator; | |
import org.springframework.integration.annotation.CorrelationStrategy; | |
import org.springframework.integration.annotation.ReleaseStrategy; | |
import org.springframework.integration.annotation.ServiceActivator; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageHeaders; | |
import org.springframework.messaging.support.MessageBuilder; | |
import org.springframework.stereotype.Component; | |
import java.util.List; | |
@Slf4j | |
@Component | |
public class EventHandlerImpl implements EventHandler { | |
private final MessageService messageService; | |
private final EventAutoConfig.MessageSync messageSync; | |
@Autowired | |
public EventHandlerImpl(final MessageService messageService, final EventAutoConfig.MessageSync messageSync) { | |
this.messageService = messageService; | |
this.messageSync = messageSync; | |
} | |
/* EVENT BRIDGING: The following @ServiceActivator methods move events from multiple channels to a single channel. */ | |
/* | |
* Provide a @ServiceActivator for each input channel that your Aggregator requires an event to be read. | |
* Each ServiceActivator must bridge that event to the same common aggregation channel, as the Aggregator | |
* can only read from a single channel to perform its correlation. | |
*/ | |
@ServiceActivator(inputChannel = EventChannels.RATEABLE_QUOTE_IN, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, autoStartup = "true") | |
@Override | |
public Message<?> rateableQuoteBridge(Message<?> message) { | |
return channelBridge(message, EventChannels.RATEABLE_QUOTE_IN); | |
} | |
@ServiceActivator(inputChannel = EventChannels.RISK_ACCEPTED_IN, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, autoStartup = "true") | |
@Override | |
public Message<?> riskAcceptedBridge(Message<?> message) { | |
return channelBridge(message, EventChannels.RISK_ACCEPTED_IN); | |
} | |
@ServiceActivator(inputChannel = EventChannels.RATE_PROVISIONED_IN, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, autoStartup = "true") | |
@Override | |
public Message<?> rateProvisionedBridge(Message<?> message) { | |
return channelBridge(message, EventChannels.RATE_PROVISIONED_IN); | |
} | |
private Message<?> channelBridge(Message<?> message, String channelName) { | |
log.debug("Received event from {} channel with processId {}, placing it on the aggregator channel.", channelName, String.valueOf(message.getHeaders().get(EventHandler.PROCESS_ID))); | |
// Update each incoming bridged message with an additional header that can be used to determine event type during aggregation. | |
return MessageBuilder.withPayload(message.getPayload()).copyHeaders(message.getHeaders()).setHeader(EVENT_TYPE_HEADER, channelName).build(); | |
} | |
/* EVENT AGGREGATION: The following @CorrelationStrategy, @ReleaseStrategy, and @Aggregator combine to read | |
* multiple events from a single channel and perform business logic to publish a resulting output event once a set of criteria are met. | |
*/ | |
/* This identifies a message's correlation key. In this implementation, we're using a message header `processId`, | |
* but this could perform event deserialization to pull data from the message body if more complex correlation is needed. | |
*/ | |
@CorrelationStrategy | |
@Override | |
public Object correlateBy(Message message) { | |
log.debug("Correlating message with id: {}", message.getHeaders().get(EventHandler.PROCESS_ID)); | |
// extract 'processId' message header | |
return message.getHeaders().get(EventHandler.PROCESS_ID); | |
} | |
/** | |
* This checks to see when a collection of messages with the same | |
* correlation key is ready for aggregation logic. In this implementation, | |
* we check that one message from each input channel has been received. | |
*/ | |
@ReleaseStrategy | |
@Override | |
public boolean isComplete(List<Message<?>> messages) { | |
boolean unrateableQuote = false; | |
boolean riskAccepted = false; | |
boolean rateProvisioned = false; | |
try { | |
for (Message<?> message : messages) { | |
MessageHeaders headers = message.getHeaders(); | |
String eventType = headers.get(EVENT_TYPE_HEADER).toString(); | |
if (eventType.equalsIgnoreCase(EventChannels.RATEABLE_QUOTE_IN)) { | |
RateableQuoteEvent event = messageService.parse(message, RateableQuoteEvent.class); | |
if (event.getType().equals(RateableQuoteEventType.UNRATEABLE_QUOTE)) { | |
unrateableQuote = true; | |
} | |
} else if (eventType.equalsIgnoreCase(EventChannels.RISK_ACCEPTED_IN)) { | |
RiskAcceptedEvent event = messageService.parse(message, RiskAcceptedEvent.class); | |
if (event.getType().equals(RiskAcceptedEventType.UNDERWRITING_RISK_ACCEPTED)) { | |
riskAccepted = true; | |
} | |
} else if (eventType.equalsIgnoreCase(EventChannels.RATE_PROVISIONED_IN)) { | |
RateProvisionedEvent event = messageService.parse(message, RateProvisionedEvent.class); | |
if (event.getType().equals(RateProvisionedEventType.RATE_PROVISIONED)) { | |
rateProvisioned = true; | |
} | |
} | |
} | |
} catch (Exception e) { | |
log.error(e.getMessage(), e); | |
} | |
return unrateableQuote || (riskAccepted && rateProvisioned); | |
} | |
/** | |
* This method takes a completed message group and calls the underlying domain service | |
* to perform required business logic to produce the output event. | |
*/ | |
@Aggregator(inputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT, outputChannel = EventChannels.AUTO_PRICING_AGGREGATE_OUT) | |
@Override | |
public Message<?> aggregate(List<Message<?>> messages) { | |
try { | |
RateableQuoteEvent unrateableQuoteEvent = null; | |
RiskAcceptedEvent riskAcceptedEvent = null; | |
RateProvisionedEvent rateProvisionedEvent = null; | |
// Pull out the event of each type for use in calling the domain service. | |
// In the case of duplicate messages, this will take the last instance of a message from a given channel. | |
for (Message<?> message : messages) { | |
String eventType = message.getHeaders().get(EVENT_TYPE_HEADER).toString(); | |
if (eventType.equalsIgnoreCase(EventChannels.RATEABLE_QUOTE_IN)) { | |
RateableQuoteEvent event = messageService.parse(message, RateableQuoteEvent.class); | |
if (event.getType().equals(RateableQuoteEventType.UNRATEABLE_QUOTE)) { | |
unrateableQuoteEvent = event; | |
} | |
} else if (eventType.equalsIgnoreCase(EventChannels.RISK_ACCEPTED_IN)) { | |
RiskAcceptedEvent event = messageService.parse(message, RiskAcceptedEvent.class); | |
if (event.getType().equals(RiskAcceptedEventType.UNDERWRITING_RISK_ACCEPTED)) { | |
riskAcceptedEvent = event; | |
} | |
} else if (eventType.equalsIgnoreCase(EventChannels.RATE_PROVISIONED_IN)) { | |
RateProvisionedEvent event = messageService.parse(message, RateProvisionedEvent.class); | |
if (event.getType().equals(RateProvisionedEventType.RATE_PROVISIONED)) { | |
rateProvisionedEvent = event; | |
} | |
} | |
} | |
//get first event and extract processId since events are correlated already | |
String processId = String.valueOf(messages.get(0).getHeaders().get(EventHandler.PROCESS_ID)); | |
if (unrateableQuoteEvent != null) { | |
messageSync.put(processId, unrateableQuoteEvent); | |
} else { | |
messageSync.put(processId, Pair.of(riskAcceptedEvent, rateProvisionedEvent)); | |
} | |
return MessageBuilder.withPayload("OK") | |
.setHeader(EventHandler.PROCESS_ID, "SUCCESS") | |
.setHeader(EventHandler.EVENT_TYPE_HEADER, "successChannel") //this causes the aggregator methods to ignore this message during grouping | |
.build(); | |
} catch (Exception e) { | |
log.error(e.getMessage(), e); | |
} | |
return null; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment