Skip to content

Instantly share code, notes, and snippets.

@rponte
Last active April 26, 2024 12:35
Show Gist options
  • Star 21 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save rponte/8a46133aeca05f07ae49035879a18143 to your computer and use it in GitHub Desktop.
Save rponte/8a46133aeca05f07ae49035879a18143 to your computer and use it in GitHub Desktop.
Spring Boot: Testing a @SqsListener with TestContainers and LocalStack
package br.com.zup.edu.app2.xxx.samples.aws.sqs;
import br.com.zup.edu.app2.xxx.samples.aws.sqs.model.Customer;
import br.com.zup.edu.app2.xxx.samples.aws.sqs.model.CustomerRepository;
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy;
import io.awspring.cloud.messaging.listener.annotation.SqsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import javax.validation.ConstraintViolationException;
import java.util.Map;
/**
* Simple condition to avoid initializing this listener when running integration tests that don't
* care about it.
*
* Unfortunately, this is necessary because Spring Cloud tries to resolve the @SqsListener's queue URL
* on startup, and if there's no SQS server up and running it crashes the application.
*/
@ConditionalOnProperty(
name = "cloud.aws.sqs.listener.auto-startup", havingValue = "true"
)
@Component
public class CustomerCreatedEventSqsConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerCreatedEventSqsConsumer.class);
private final CustomerRepository repository;
public CustomerCreatedEventSqsConsumer(CustomerRepository repository) {
this.repository = repository;
}
@SqsListener(
value = "${samples.aws.sqs.consumer-queue}",
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
public void receive(CustomerCreatedEvent event, @Header("MessageId") String messageId) {
LOGGER.info(
"Receiving a CustomerCreatedEvent (MessageId=\"{}\") from SQS queue: {}",
messageId, event
);
// converts to domain model and invokes the business logic
Customer customer = event.toModel();
repository.save(customer);
}
/**
* This is how we can handle errors with @SqsListener, and as you can see, it's very
* similar to Controller Advices
*/
@MessageExceptionHandler({
ConstraintViolationException.class
})
public void handleOnError(ConstraintViolationException exception,
@Payload CustomerCreatedEvent event,
@Headers Map<String, String> headers) {
LOGGER.error(
"It was not possible to consume the message with messageId={} (ApproximateReceiveCount ={}): {}",
headers.get("MessageId"),
headers.get("ApproximateReceiveCount"),
event,
exception
);
// TODO: write your error handling logic here...
// TODO: also, you can annotate this method with @SendTo("myQueue-DLQ") to forward the message to another queue
}
}
package br.com.zup.edu.app2.xxx.samples.aws.sqs;
import br.com.zup.edu.app2.xxx.samples.aws.sqs.base.SqsIntegrationTest;
import br.com.zup.edu.app2.xxx.samples.aws.sqs.model.CustomerRepository;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.test.context.TestPropertySource;
import java.time.LocalDateTime;
import java.util.UUID;
import static java.util.List.of;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
/**
* Here we start the listener on startup, and we guarantee that
* it is stopped in the end of all tests by closing the application context (thanks to @DirtiesContext)
*/
@TestPropertySource(properties = {
"cloud.aws.sqs.listener.auto-startup = true"
})
class CustomerCreatedEventSqsConsumerTest extends SqsIntegrationTest {
@Autowired
private QueueMessagingTemplate sqsTemplate;
@Autowired
private AmazonSQSAsync SQS;
@Value("${samples.aws.sqs.consumer-queue}")
private String consumerQueueName;
@Autowired
private CustomerRepository repository;
@BeforeEach
public void setUp() {
repository.deleteAll();
SQS.purgeQueue(new PurgeQueueRequest(consumerQueueName));
}
@Test
@DisplayName("should consume an event from SQS queue")
public void t1() {
// scenario
CustomerCreatedEvent event = new CustomerCreatedEvent(
UUID.randomUUID(),
"Rafael Ponte",
"+5585988776655",
LocalDateTime.now()
);
sqsTemplate
.convertAndSend(consumerQueueName, event);
// action
// ...is async, so it will be performed by our SQS listener
// validation
await().atMost(3, SECONDS).untilAsserted(() -> {
assertThat(numberOfMessagesInQueue()).isEqualTo(0);
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(0);
assertThat(repository.findAll())
.hasSize(1)
.usingRecursiveFieldByFieldElementComparator()
.containsExactly(event.toModel());
});
}
@Test
@DisplayName("should not consume an event from SQS queue when the event is invalid")
public void t2() {
// scenario
CustomerCreatedEvent invalidEvent = new CustomerCreatedEvent(
UUID.randomUUID(), null, null, null
);
sqsTemplate
.convertAndSend(consumerQueueName, invalidEvent);
// action
// ...is async, so it will be performed by our SQS listener
// validation
await().atMost(3, SECONDS).untilAsserted(() -> {
assertThat(repository.count()).isEqualTo(0);
assertThat(numberOfMessagesInQueue()).isEqualTo(0);
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(1); // messages with errors stay not-visible for 30s
});
}
private Integer numberOfMessagesInQueue() {
GetQueueAttributesResult attributes = SQS
.getQueueAttributes(consumerQueueName, of("All"));
return Integer.parseInt(
attributes.getAttributes().get("ApproximateNumberOfMessages")
);
}
private Integer numberOfMessagesNotVisibleInQueue() {
GetQueueAttributesResult attributes = SQS
.getQueueAttributes(consumerQueueName, of("All"));
return Integer.parseInt(
attributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible")
);
}
}
package br.com.zup.edu.app2.xxx.samples.aws.sqs.base;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import static org.testcontainers.containers.localstack.LocalStackContainer.Service.SQS;
/**
* Base class responsible for starting Localstack and configuring it into the application
* before tests are executed
*/
@SpringBootTest
@ActiveProfiles("test")
@Import(SqsTestConfig.class)
@Testcontainers @DirtiesContext
public abstract class SqsIntegrationTest {
private static DockerImageName LOCALSTACK_IMAGE = DockerImageName.parse("localstack/localstack");
@Container
public static LocalStackContainer LOCALSTACK_CONTAINER = new LocalStackContainer(LOCALSTACK_IMAGE)
.withServices(SQS);
/**
* Just configures Localstack's SQS server endpoint in the application
*/
@DynamicPropertySource
static void registerProperties(DynamicPropertyRegistry registry) {
registry.add("cloud.aws.sqs.endpoint",
() -> LOCALSTACK_CONTAINER.getEndpointOverride(SQS).toString());
}
}
package br.com.zup.edu.app2.xxx.samples.aws.sqs.base;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import io.awspring.cloud.autoconfigure.messaging.SqsProperties;
import io.awspring.cloud.core.env.ResourceIdResolver;
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import io.awspring.cloud.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.core.CachingDestinationResolverProxy;
import org.springframework.messaging.core.DestinationResolver;
@TestConfiguration
public class SqsTestConfig {
@Autowired
private SqsProperties sqsProperties;
/**
* Configures the SimpleMessageListenerContainer to auto-create a SQS Queue in case it does not exist.
*
* This is necessary because if the queue does not exist during startup the SimpleMessageListenerContainer
* stops working with the following warning message:
*
* > WARN [main] i.a.c.m.l.SimpleMessageListenerContainer:
* > Ignoring queue with name 'customersCreatedQueue': The queue does not exist.;
* > nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue
* > does not exist for this wsdl version.
*/
@Bean
public BeanPostProcessor simpleMessageListenerContainerPostProcessor(DestinationResolver<String> destinationResolver) {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof SimpleMessageListenerContainer container) {
container.setDestinationResolver(destinationResolver);
}
return bean;
}
};
}
/**
* Creates a DynamicQueueUrlDestinationResolver capable of auto-creating
* a SQS queue in case it does not exist
*/
@Bean
public DestinationResolver<String> autoCreateQueueDestinationResolver(
AmazonSQSAsync sqs,
@Autowired(required = false) ResourceIdResolver resourceIdResolver) {
DynamicQueueUrlDestinationResolver autoCreateQueueResolver
= new DynamicQueueUrlDestinationResolver(sqs, resourceIdResolver);
autoCreateQueueResolver.setAutoCreate(true);
return new CachingDestinationResolverProxy<>(autoCreateQueueResolver);
}
}
##
# Spring Cloud AWS
# https://docs.awspring.io/spring-cloud-aws/docs/current/reference/html/appendix.html
##
cloud:
aws:
stack:
auto: false
enabled: false
credentials:
access-key: localstackAccessKeyId
secret-key: localstackSecretAccessKey
region:
static: sa-east-1
sqs:
endpoint: http://localhost:4566
listener:
auto-startup: true
fail-on-missing-queue: false
default-deletion-policy: NO_REDRIVE
max-number-of-messages: 10
visibility-timeout: 30 # 30s
wait-timeout: 20 # 20s
back-off-time: 10000 # 10s
logging:
level:
io.awspring.cloud.messaging.core: info
io.awspring.cloud.messaging.listener: info
com.amazonaws.services.sqs: debug
# Samples configuration
samples.aws.sqs.consumer-queue: customersCreatedQueue
##
# Spring Cloud AWS
##
cloud:
aws:
sqs:
listener:
auto-startup: false
queue-stop-timeout: 500
@brewagebear
Copy link

It's a very cool example Thank you.

@Aleks-Becker
Copy link

thanks for sharing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment