Skip to content

Instantly share code, notes, and snippets.

@rponte
Last active August 10, 2023 10:59
Show Gist options
  • Save rponte/2d881a3c531fc862183d7b3d716e2458 to your computer and use it in GitHub Desktop.
Save rponte/2d881a3c531fc862183d7b3d716e2458 to your computer and use it in GitHub Desktop.
Spring Boot: how to test @SqsListener via integration tests
##
# Application configuration
##
samples.aws.sqs.consumer-queue: customersCreatedQueue
##
# Spring Cloud AWS
##
cloud:
aws:
sqs:
listener:
auto-startup: false # does not start the listener on startup
queue-stop-timeout: 500 # when stopping the listener don't wait too much
package br.com.zup.edu.demo.samples.aws.sqs;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import io.awspring.cloud.core.env.ResourceIdResolver;
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import io.awspring.cloud.messaging.support.destination.DynamicQueueUrlDestinationResolver;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.core.CachingDestinationResolverProxy;
import org.springframework.messaging.core.DestinationResolver;
@TestConfiguration
public class SqsTestConfig {
/**
* Configures the SimpleMessageListenerContainer to auto-create a SQS Queue in case it does not exist.
*
* This is necessary because if the queue does not exist during startup the SimpleMessageListenerContainer
* stops working with the following warning message:
*
* > WARN [main] i.a.c.m.l.SimpleMessageListenerContainer:
* > Ignoring queue with name 'customersCreatedQueue': The queue does not exist.;
* > nested exception is com.amazonaws.services.sqs.model.QueueDoesNotExistException: The specified queue
* > does not exist for this wsdl version.
*/
@Bean
public BeanPostProcessor simpleMessageListenerContainerPostProcessor(DestinationResolver<String> destinationResolver) {
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof SimpleMessageListenerContainer container) {
container.setQueueStopTimeout(500);
container.setDestinationResolver(destinationResolver);
}
return bean;
}
};
}
/**
* Creates a DynamicQueueUrlDestinationResolver capable of auto-creating
* a SQS queue in case it does not exist
*/
@Bean
public DestinationResolver<String> autoCreateQueueDestinationResolver(
AmazonSQSAsync sqs,
@Autowired(required = false) ResourceIdResolver resourceIdResolver) {
DynamicQueueUrlDestinationResolver autoCreateQueueResolver
= new DynamicQueueUrlDestinationResolver(sqs, resourceIdResolver);
autoCreateQueueResolver.setAutoCreate(true);
return new CachingDestinationResolverProxy<>(autoCreateQueueResolver);
}
}
package br.com.zup.edu.demo.samples.aws.sqs;
import br.com.zup.edu.demo.samples.aws.sqs.model.Customer;
import br.com.zup.edu.demo.samples.aws.sqs.model.CustomerRepository;
import io.awspring.cloud.messaging.listener.SqsMessageDeletionPolicy;
import io.awspring.cloud.messaging.listener.annotation.SqsListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.MessageExceptionHandler;
import org.springframework.stereotype.Component;
import javax.validation.ConstraintViolationException;
import java.util.Map;
@Component
public class CustomerCreatedEventSqsListener {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomerCreatedEventSqsListener.class);
private final CustomerRepository repository;
public CustomerCreatedEventSqsListener(CustomerRepository repository) {
this.repository = repository;
}
@SqsListener(
value = "${samples.aws.sqs.consumer-queue}",
deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS
)
public void receive(CustomerCreatedEvent event, @Header("MessageId") String messageId) {
LOGGER.info(
"Receiving a CustomerCreatedEvent (MessageId=\"{}\") from SQS queue: {}",
messageId, event
);
// converts to domain model and invokes your business logic
Customer customer = event.toModel();
repository.save(customer);
}
@MessageExceptionHandler(ConstraintViolationException.class)
public void handleOnError(ConstraintViolationException exception, @Headers Map<String, String> headers) {
LOGGER.error(
"It was not possible to consume the message with messageId={} (ApproximateReceiveCount ={}): {}",
headers.get("MessageId"),
headers.get("ApproximateReceiveCount"),
exception.getLocalizedMessage()
);
}
}
package br.com.zup.edu.demo.samples.aws.sqs;
import br.com.zup.edu.demo.samples.aws.sqs.model.CustomerRepository;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.TestPropertySource;
import java.time.LocalDateTime;
import java.util.UUID;
import static java.util.List.of;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@SpringBootTest
@ActiveProfiles("test")
@Import(SqsTestConfig.class)
/**
* Starts the listener on startup, and stops the listener after all tests
* (the @DirtiesContext annotation closes this context)
*/
@DirtiesContext
@TestPropertySource(properties = {
"cloud.aws.sqs.listener.auto-startup = true"
})
class CustomerCreatedEventSqsListenerTest {
@Autowired
private QueueMessagingTemplate sqsTemplate;
@Autowired
private AmazonSQSAsync SQS;
@Value("${samples.aws.sqs.consumer-queue}")
private String consumerQueueName;
@Autowired
private CustomerRepository repository;
@BeforeEach
public void setUp() {
repository.deleteAll();
SQS.purgeQueue(new PurgeQueueRequest(consumerQueueName));
}
@Test
@DisplayName("should consume an event from SQS queue")
public void t1() {
// scenario
CustomerCreatedEvent event = new CustomerCreatedEvent(
UUID.randomUUID(),
"Rafael Ponte",
"+5585988776655",
LocalDateTime.now()
);
sqsTemplate
.convertAndSend(consumerQueueName, event);
// action
// ...is async, so it will be performed by our SQS listener
// validation
await().atMost(3, SECONDS).untilAsserted(() -> {
assertThat(numberOfMessagesInQueue()).isEqualTo(0);
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(0);
assertThat(repository.findAll())
.hasSize(1)
.usingRecursiveFieldByFieldElementComparator()
.containsExactly(event.toModel());
});
}
@Test
@DisplayName("should not consume an event from SQS queue when the event is invalid")
public void t2() {
// scenario
CustomerCreatedEvent invalidEvent = new CustomerCreatedEvent(
UUID.randomUUID(), null, null, null
);
sqsTemplate
.convertAndSend(consumerQueueName, invalidEvent);
// action
// ...is async, so it will be performed by our SQS listener
// validation
await().atMost(3, SECONDS).untilAsserted(() -> {
assertThat(repository.count()).isEqualTo(0);
assertThat(numberOfMessagesInQueue()).isEqualTo(0);
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(1); // messages with errors stay not-visible for 30s
});
}
private Integer numberOfMessagesInQueue() {
GetQueueAttributesResult attributes = SQS
.getQueueAttributes(consumerQueueName, of("All"));
return Integer.parseInt(
attributes.getAttributes().get("ApproximateNumberOfMessages")
);
}
private Integer numberOfMessagesNotVisibleInQueue() {
GetQueueAttributesResult attributes = SQS
.getQueueAttributes(consumerQueueName, of("All"));
return Integer.parseInt(
attributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible")
);
}
}
package br.com.zup.edu.demo.samples.aws.sqs;
import br.com.zup.edu.demo.samples.aws.sqs.model.CustomerRepository;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.model.GetQueueAttributesResult;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import io.awspring.cloud.messaging.core.QueueMessagingTemplate;
import io.awspring.cloud.messaging.listener.SimpleMessageListenerContainer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.ActiveProfiles;
import java.time.LocalDateTime;
import java.util.UUID;
import static java.util.List.of;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@SpringBootTest
@ActiveProfiles("test")
@Import(SqsTestConfig.class)
class CustomerCreatedEventSqsListenerTest {
@Autowired
private QueueMessagingTemplate sqsTemplate;
@Autowired
private AmazonSQSAsync SQS;
@Value("${samples.aws.sqs.consumer-queue}")
private String consumerQueueName;
/**
* We inject the listener here because we must start and stop the listener on each test
*/
@Autowired
private SimpleMessageListenerContainer consumerInBackground;
@Autowired
private CustomerRepository repository;
@BeforeEach
public void setUp() {
repository.deleteAll();
SQS.purgeQueue(new PurgeQueueRequest(consumerQueueName));
}
@AfterEach
public void cleanUp() {
// stop the listener after each test
consumerInBackground.stop(consumerQueueName);
}
@Test
@DisplayName("should consume an event from SQS queue")
public void t1() {
// scenario
CustomerCreatedEvent event = new CustomerCreatedEvent(
UUID.randomUUID(),
"Rafael Ponte",
"+5585988776655",
LocalDateTime.now()
);
sqsTemplate
.convertAndSend(consumerQueueName, event);
// action
// ...is async, so it will be performed by our SQS listener
consumerInBackground.start(consumerQueueName); // starts the listener
// validation
await().atMost(3, SECONDS).untilAsserted(() -> {
assertThat(numberOfMessagesInQueue()).isEqualTo(0);
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(0);
assertThat(repository.findAll())
.hasSize(1)
.usingRecursiveFieldByFieldElementComparator()
.containsExactly(event.toModel());
});
}
@Test
@DisplayName("should not consume an event from SQS queue when the event is invalid")
public void t2() {
// scenario
CustomerCreatedEvent invalidEvent = new CustomerCreatedEvent(
UUID.randomUUID(), null, null, null
);
sqsTemplate
.convertAndSend(consumerQueueName, invalidEvent);
// action
// ...is async, so it will be performed by our SQS listener
consumerInBackground.start(consumerQueueName); // starts the listener
// validation
await().atMost(3, SECONDS).untilAsserted(() -> {
assertThat(repository.count()).isEqualTo(0);
assertThat(numberOfMessagesInQueue()).isEqualTo(0);
assertThat(numberOfMessagesNotVisibleInQueue()).isEqualTo(1); // messages with errors stay not-visible for 30s
});
}
private Integer numberOfMessagesInQueue() {
GetQueueAttributesResult attributes = SQS
.getQueueAttributes(consumerQueueName, of("All"));
return Integer.parseInt(
attributes.getAttributes().get("ApproximateNumberOfMessages")
);
}
private Integer numberOfMessagesNotVisibleInQueue() {
GetQueueAttributesResult attributes = SQS
.getQueueAttributes(consumerQueueName, of("All"));
return Integer.parseInt(
attributes.getAttributes().get("ApproximateNumberOfMessagesNotVisible")
);
}
}
version: "3.8"
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
image: localstack/localstack
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
- "127.0.0.1:53:53" # DNS config (only required for Pro)
- "127.0.0.1:53:53/udp" # DNS config (only required for Pro)
- "127.0.0.1:443:443" # LocalStack HTTPS Gateway (only required for Pro)
environment:
- AWS_DEFAULT_REGION=sa-east-1
- AWS_ACCESS_KEY_ID=localstackAccessKeyId
- AWS_SECRET_ACCESS_KEY=localstackSecretAccessKey
- DEFAULT_REGION=sa-east-1
- USE_SINGLE_REGION=1
- SERVICES=${SERVICES-}
- EAGER_SERVICE_LOADING=0
- DEBUG=${DEBUG-}
- PERSISTENCE=${PERSISTENCE-}
- LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR-}
- LOCALSTACK_API_KEY=${LOCALSTACK_API_KEY-1QKEfmFpC5} # only required for Pro
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./localstack}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
@rponte
Copy link
Author

rponte commented Oct 27, 2022

this is a better version of the above code using TestContainers to start the Localstack up: https://gist.github.com/rponte/8a46133aeca05f07ae49035879a18143

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment