Skip to content

Instantly share code, notes, and snippets.

@fastnsilver
Created May 1, 2017 17:32
Show Gist options
  • Save fastnsilver/20209f7bcdeaa5084b72a5e8857b66fd to your computer and use it in GitHub Desktop.
Save fastnsilver/20209f7bcdeaa5084b72a5e8857b66fd to your computer and use it in GitHub Desktop.
Working w/ Spring Cloud and SQS
import java.util.List;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.json.JSONArray;
@Getter
@NoArgsConstructor(access = AccessLevel.PACKAGE)
@AllArgsConstructor(access = AccessLevel.PACKAGE)
@Builder
public class AwsMetricCollectionRequest implements HasResourceType {
private String accountNumber;
private String region;
private String iamRoleArn;
private String resourceType;
private List<String> tagNames;
private JSONArray resource;
public static AwsMetricCollectionRequest of(AwsSurveillanceRequest origin, JSONArray resource) {
return AwsMetricCollectionRequest
.builder()
.accountNumber(origin.getAccountNumber())
.region(origin.getRegion())
.iamRoleArn(origin.getIamRoleArn())
.resourceType(origin.getResourceType())
.tagNames(origin.getTagNames())
.resource(resource)
.build();
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
@Slf4j
public class AwsMetricCollectorMessageSender<P extends HasResourceType> implements MessageSender<P> {
private final QueueMessagingTemplate queueMessagingTemplate;
private final String queueName;
public AwsMetricCollectorMessageSender(String queueName, QueueMessagingTemplate queueMessagingTemplate) {
this.queueMessagingTemplate = queueMessagingTemplate;
this.queueName = queueName;
}
@Override
public void send(Message<P> message) {
Assert.notNull(message.getPayload(), "Cannot send metric collection request with a null payload!");
log.debug("Sent message! queueName={}, headers={}, payload={}", queueName, message.getHeaders().toString(),
message.getPayload().toString());
queueMessagingTemplate.convertAndSend(queueName, message.getPayload(), message.getHeaders());
}
}
public interface HasResourceType {
String getResourceType();
}
@Override
@SqsListener(value = "${sqs.inbound.name}", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void onMessage(@Payload String payload, Acknowledgment ack, @Headers MessageHeaders headers) {
Assert.notNull(payload, "Cannot process a null metric collection request!");
Assert.notNull(headers, "Message headers cannot be null");
Assert.notEmpty(headers, "Message headers cannot be empty");
log.info("Received metric collection request! payload={}", payload);
try {
onMessageInternal(mapper.readValue(payload, AwsMetricCollectionRequest.class), ack, headers);
} catch (IOException e) {
log.warn("Corrupt payload! Was expecting an AwsMetricCollectionRequest.");
ack.acknowledge();
}
}
public interface MessageSender<P extends HasResourceType> {
void send(Message<P> message);
}
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.aws.core.env.ResourceIdResolver;
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.listener.QueueMessageHandler;
import org.springframework.cloud.aws.messaging.listener.support.AcknowledgmentHandlerMethodArgumentResolver;
import org.springframework.cloud.aws.messaging.support.NotificationMessageArgumentResolver;
import org.springframework.cloud.aws.messaging.support.NotificationSubjectArgumentResolver;
import org.springframework.cloud.aws.messaging.support.converter.ObjectMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SimpleMessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.validation.Errors;
import org.springframework.validation.Validator;
@Configuration
@EnableSqs
class SqsConfig {
@Autowired
private QueueMessagingTemplate queueMessagingTemplate;
@Bean
public QueueMessagingTemplate queueMessagingTemplate(
AmazonSQSAsync amazonSqs, ResourceIdResolver resourceIdResolver, ObjectMapper objectMapper) {
return new QueueMessagingTemplate(
amazonSqs, resourceIdResolver, createJacksonConverter(objectMapper));
}
@Bean("inboundMessageSender")
public MessageSender<AwsMetricCollectionRequest> outboundMessageSender(
@Value("${sqs.inbound.name}") String queueName) {
return new AwsMetricCollectorMessageSender<>(queueName, queueMessagingTemplate);
}
// @see https://github.com/spring-cloud/spring-cloud-aws/issues/193
@Bean
public QueueMessageHandlerPostProcessor queueMessageHandlerPostProcessor(ObjectMapper objectMapper) {
return new QueueMessageHandlerPostProcessor(getMethodArgumentResolvers(objectMapper));
}
private List<HandlerMethodArgumentResolver> getMethodArgumentResolvers(ObjectMapper objectMapper) {
MessageConverter jacksonMessageConverter = createJacksonConverter(objectMapper);
MessageConverter compositeMessageConverter = createCompositeConverter(objectMapper);
return
Arrays.asList(
new HeaderMethodArgumentResolver(null, null),
new HeadersMethodArgumentResolver(),
new NotificationSubjectArgumentResolver(),
new AcknowledgmentHandlerMethodArgumentResolver("Acknowledgment"),
new NotificationMessageArgumentResolver(compositeMessageConverter),
new PayloadArgumentResolver(jacksonMessageConverter, new NoOpValidator()));
}
private MessageConverter createJacksonConverter(ObjectMapper objectMapper) {
MappingJackson2MessageConverter jacksonMessageConverter = new MappingJackson2MessageConverter();
jacksonMessageConverter.setObjectMapper(objectMapper);
jacksonMessageConverter.setSerializedPayloadClass(String.class);
jacksonMessageConverter.setStrictContentTypeMatch(true);
return jacksonMessageConverter;
}
private MessageConverter createCompositeConverter(ObjectMapper objectMapper) {
List<MessageConverter> payloadArgumentConverters = new ArrayList<>();
payloadArgumentConverters.add(createJacksonConverter(objectMapper));
StringMessageConverter stringMessageConverter = new StringMessageConverter();
stringMessageConverter.setSerializedPayloadClass(String.class);
payloadArgumentConverters.add(stringMessageConverter);
ObjectMessageConverter objectMessageConverter = new ObjectMessageConverter();
objectMessageConverter.setStrictContentTypeMatch(true);
payloadArgumentConverters.add(objectMessageConverter);
payloadArgumentConverters.add(new SimpleMessageConverter());
return new CompositeMessageConverter(payloadArgumentConverters);
}
private static final class NoOpValidator implements Validator {
@Override
public boolean supports(Class<?> clazz) {
return false;
}
@Override
public void validate(Object target, Errors errors) {
}
}
private static final class QueueMessageHandlerPostProcessor implements BeanPostProcessor {
private List<HandlerMethodArgumentResolver> argumentResolvers;
public QueueMessageHandlerPostProcessor(List<HandlerMethodArgumentResolver> argumentResolvers) {
this.argumentResolvers = argumentResolvers;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof QueueMessageHandler) {
((QueueMessageHandler) bean).setArgumentResolvers(argumentResolvers);
}
return bean;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment