Last active
July 28, 2018 07:37
-
-
Save ghsatpute/1201595be35800cbb18f5c9763e0f168 to your computer and use it in GitHub Desktop.
SQS Send and Receive using Spring Cloud
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.regions.Regions; | |
import com.amazonaws.services.cloudformation.AmazonCloudFormationAsync; | |
import com.amazonaws.services.cloudformation.AmazonCloudFormationAsyncClientBuilder; | |
import com.amazonaws.services.sqs.AmazonSQS; | |
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder; | |
import org.springframework.cloud.aws.messaging.config.annotation.EnableSqs; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
/* | |
* Created by Ganesh Satpute on 25/7/18 2:45 PM. | |
*/ | |
@Configuration | |
@EnableSqs | |
public class AwsConfig { | |
@Bean | |
public AmazonSQS amazonSQSAsync() { | |
return AmazonSQSAsyncClientBuilder.standard() | |
.withRegion(Regions.US_WEST_1) // TODO: Load region from configuration | |
.build(); | |
} | |
@Value(value = "${queue-name}") | |
private String sqsInstanceActionQueue = null; | |
@Bean | |
public QueueMessagingTemplate amazonSQSAsync() { | |
AmazonSQSAsync sqsAsync = AmazonSQSAsyncClientBuilder.standard() | |
.withRegion(Regions.US_WEST_1) // TODO: Configurations should be load from file | |
.build(); | |
QueueMessagingTemplate queueMessagingTemplate = new QueueMessagingTemplate(sqsAsync); | |
queueMessagingTemplate.setDefaultDestinationName(sqsInstanceActionQueue); | |
return queueMessagingTemplate; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.cloud.aws.messaging.listener.SqsMessageDeletionPolicy; | |
import org.springframework.cloud.aws.messaging.listener.annotation.SqsListener; | |
import org.springframework.messaging.handler.annotation.Headers; | |
import org.springframework.messaging.handler.annotation.Payload; | |
import org.springframework.stereotype.Service; | |
import java.util.Map; | |
@Service | |
public class SqsConsumer { | |
private static Logger LOGGER = LoggerFactory.getLogger(SqsConsumer.class); | |
@SqsListener(value = "${queue-name}", deletionPolicy = | |
SqsMessageDeletionPolicy.ON_SUCCESS) | |
private void processMessage(@Payload String payload, @Headers Map<String, String> headers) { | |
LOGGER.info("Received message: '{}'", payload); | |
// Your business logic here | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.core.JsonProcessingException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; | |
import org.springframework.cloud.aws.messaging.core.SqsMessageHeaders; | |
import org.springframework.scheduling.annotation.Scheduled; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
/* | |
* Created by Ganesh Satpute on 27/7/18 5:53 PM. | |
*/ | |
@Component | |
public class SqsProducer { | |
private static final Logger LOGGER = LoggerFactory.getLogger(SqsProducer.class); | |
@Autowired | |
private QueueMessagingTemplate queueMessagingTemplate = null; | |
@Value(value = "${queue-name}") | |
private String sqsInstanceActionQueue = null; | |
@Scheduled(fixedDelay = 15000) | |
public void postInstanceActionOnSqs() throws JsonProcessingException { | |
LOGGER.debug("Starting processing of pending instance actions"); | |
String payload = "MyPayloadObject"; | |
Map<String, Object> headers = new HashMap<>(); | |
headers.put(SqsMessageHeaders.SQS_DEDUPLICATION_ID_HEADER, "some-unique-id"); | |
headers.put(SqsMessageHeaders.SQS_GROUP_ID_HEADER, "demo-poc-provisioning"); | |
queueMessagingTemplate.convertAndSend(sqsInstanceActionQueue, payload, headers); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment