Skip to content

Instantly share code, notes, and snippets.

@ghsatpute
Last active July 28, 2018 07:37
Show Gist options
  • Save ghsatpute/1201595be35800cbb18f5c9763e0f168 to your computer and use it in GitHub Desktop.
Save ghsatpute/1201595be35800cbb18f5c9763e0f168 to your computer and use it in GitHub Desktop.
SQS Send and Receive using Spring Cloud
import com.amazonaws.regions.Regions;
import com.amazonaws.services.cloudformation.AmazonCloudFormationAsync;
import com.amazonaws.services.cloudformation.AmazonCloudFormationAsyncClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
* Created by Ganesh Satpute on 25/7/18 2:45 PM.
*/
@Configuration
@EnableSqs
public class AwsConfig {
@Bean
public AmazonSQS amazonSQSAsync() {
return AmazonSQSAsyncClientBuilder.standard()
.withRegion(Regions.US_WEST_1) // TODO: Load region from configuration
.build();
}
@Value(value = "${queue-name}")
private String sqsInstanceActionQueue = null;
@Bean
public QueueMessagingTemplate amazonSQSAsync() {
AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard()
.withRegion(Regions.US_WEST_1) // TODO: Configurations should be load from file
.build();
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(sqsAsync);
queueMessagingTemplate.setDefaultDestinationName(sqsInstanceActionQueue);
return queueMessagingTemplate;
}
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy;
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service
public class SqsConsumer {
private static Logger LOGGER = LoggerFactory.getLogger(SqsConsumer.class);
@SqsListener(value = "${queue-name}", deletionPolicy =
SqsMessageDeletionPolicy.ON_SUCCESS)
private void processMessage(@Payload String payload, @Headers Map<String, String> headers) {
LOGGER.info("Received message: '{}'", payload);
// Your business logic here
}
}
import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.cloud.aws.messaging.core.SqsMessageHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/*
* Created by Ganesh Satpute on 27/7/18 5:53 PM.
*/
@Component
public class SqsProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsProducer.class);
@Autowired
private QueueMessagingTemplate queueMessagingTemplate = null;
@Value(value = "${queue-name}")
private String sqsInstanceActionQueue = null;
@Scheduled(fixedDelay = 15000)
public void postInstanceActionOnSqs() throws JsonProcessingException {
LOGGER.debug("Starting processing of pending instance actions");
String payload = "MyPayloadObject";
Map<String, Object> headers = new HashMap<>();
headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, "some-unique-id");
headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "demo-poc-provisioning");
queueMessagingTemplate.convertAndSend(sqsInstanceActionQueue, payload, headers);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment