Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paulfairless/81eb87bd5a0e1b3a6d048ee913efc79a to your computer and use it in GitHub Desktop.
Save paulfairless/81eb87bd5a0e1b3a6d048ee913efc79a to your computer and use it in GitHub Desktop.
Specify pool for spring-cloud-aws SQS listener
package com.endource.aws
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.model.Message
import com.amazonaws.services.sqs.model.MessageAttributeValue
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.cloud.aws.messaging.config.SimpleMessageListenerContainerFactory
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener
import org.springframework.context.annotation.Bean
import org.springframework.core.task.AsyncTaskExecutor
import org.springframework.messaging.handler.annotation.Headers
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.stereotype.Component
@Component
@ConditionalOnProperty('PURCHASELINK_UPDATES_QUEUE_URL')
class PurchaseLinkUpdateListener {
@Autowired
PurchaseLinkUpdateService purchaseLinkUpdateService
private static final Logger log = LoggerFactory.getLogger(PurchaseLinkUpdateListener.class);
@SuppressWarnings("unused")
@SqsListener(value = '#{systemProperties.PURCHASELINK_UPDATES_QUEUE_URL}', deletionPolicy = SqsMessageDeletionPolicy.ON_SUCCESS)
public void consumeSqsMessage(@Headers Map<String, String> headers, String rawJsonMessage)
{
try {
// do something
} catch (UnsupportedSchemaException e) {
log.error("Received unsupported schema in message - discarding.")
}
}
@Bean
public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory(AmazonSQSAsync amazonSqs) {
SimpleMessageListenerContainerFactory factory = new SimpleMessageListenerContainerFactory();
factory.setAmazonSqs(amazonSqs);
factory.setAutoStartup(true);
factory.setMaxNumberOfMessages(10);
factory.setTaskExecutor(createDefaultTaskExecutor())
return factory;
}
protected AsyncTaskExecutor createDefaultTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("PurchaseLinkUpdateExecutor - ");
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setMaxPoolSize(10);
// No use of a thread pool executor queue to avoid retaining message to long in memory
threadPoolTaskExecutor.setQueueCapacity(2);
threadPoolTaskExecutor.afterPropertiesSet();
return threadPoolTaskExecutor;
}
}
@trashface
Copy link

Thank you

@jackmahoney
Copy link

danke

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment