Skip to content

Instantly share code, notes, and snippets.

@UnquietCode
Last active October 1, 2022 04:00
Show Gist options
  • Save UnquietCode/5717942 to your computer and use it in GitHub Desktop.
Save UnquietCode/5717942 to your computer and use it in GitHub Desktop.
Mock AWS SQS implementation which operatesin-memory rather than hitting the real SQS.
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.regions.Region;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*;
import com.google.common.hash.Hashing;
import java.util.*;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Sitting in the airport, unable to connect to the internet, this seemed
* like a good use of my time, at least as compared with sleeping.
*
* @author Ben Fagin
* @version 2013-05-28
*/
public class MockSQS implements AmazonSQS {
private final Map<String, Queue<MessageInfo>> queues = new HashMap<>();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
private int timeout = 35*60;
private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>();
/*
- adds a message to the correct queue
- delays if required
*/
@Override
public SendMessageResult sendMessage(final SendMessageRequest request) throws AmazonServiceException, AmazonClientException {
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl());
final MessageInfo info = new MessageInfo();
info.body = checkNotNull(request.getMessageBody());
info.id = UUID.randomUUID().toString();
if (request.getDelaySeconds() == null) {
queue.add(info);
} else {
Runnable task = new Runnable() {
public void run() {
queue.add(info);
}
};
executor.schedule(task, request.getDelaySeconds(), TimeUnit.SECONDS);
}
return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash());
}
/*
- takes messages off the queue
- if timeout, then they are added back
*/
@Override
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws AmazonServiceException, AmazonClientException {
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl());
List<Message> messages = new ArrayList<>();
Integer max = request.getMaxNumberOfMessages();
if (max == null) { max = 0; }
checkArgument(max <= 10 && max > 0);
Integer visibilityTimeout = request.getVisibilityTimeout();
if (visibilityTimeout == null) { visibilityTimeout = timeout; }
for (int i=0; i < max; ++i) {
final MessageInfo info = queue.poll();
if (info != null) {
final String receiptHandle = UUID.randomUUID().toString();
Message message = new Message();
message.setBody(info.body);
message.setMessageId(info.id);
message.setMD5OfBody(info.hash());
message.setReceiptHandle(receiptHandle);
messages.add(message);
Runnable command = new Runnable() {
public void run() {
queue.add(info);
receivedMessages.remove(receiptHandle);
}
};
ScheduledMessage scheduled = new ScheduledMessage();
scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS);
scheduled.runnable = command;
receivedMessages.put(message.getReceiptHandle(), scheduled);
}
}
return new ReceiveMessageResult().withMessages(messages);
}
/*
- deletes the task which would have re-added a message to the queue,
effectively deleting the message
*/
@Override
public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException {
ScheduledMessage scheduled = receivedMessages.remove(request.getReceiptHandle());
if (scheduled == null) {
throw new RuntimeException("message does not exist");
}
scheduled.future.cancel(true);
}
@Override
public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException {
ScheduledMessage scheduled = receivedMessages.get(request.getReceiptHandle());
if (scheduled == null) {
throw new RuntimeException("message does not exist");
}
scheduled.future.cancel(true);
scheduled.future = executor.schedule(scheduled.runnable, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS);
}
@Override
public void shutdown() {
executor.shutdown();
receivedMessages.clear();
queues.clear();
}
private static class MessageInfo {
String body;
String id;
String hash() {
return Hashing.md5().hashString(body).toString();
}
}
private static class ScheduledMessage {
ScheduledFuture future;
Runnable runnable;
}
private Queue<MessageInfo> getOrCreateQueue(String url) {
Queue<MessageInfo> queue = queues.get(checkNotNull(url));
if (queue == null) {
synchronized (queues) {
queue = queues.get(checkNotNull(url));
if (queue == null) {
queue = new ArrayDeque<>();
queues.put(url, queue);
}
}
}
return queue;
}
/*
- set the default timeout when receiving messages from the queue
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
//---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---//
@Override
public void setEndpoint(String endpoint) throws IllegalArgumentException {
throw new RuntimeException("not implemented");
}
@Override
public void setRegion(Region region) throws IllegalArgumentException {
throw new RuntimeException("not implemented");
}
@Override
public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public void removePermission(RemovePermissionRequest removePermissionRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public void deleteQueue(DeleteQueueRequest deleteQueueRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public void addPermission(AddPermissionRequest addPermissionRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
throw new RuntimeException("not implemented");
}
}
@orvyl
Copy link

orvyl commented Feb 28, 2017

Hi!

Just wanted to say THANK YOU VERY MUCH for this one. It really helps us big time. I created our own implementation https://gist.github.com/orvyl/870fba7ddbaeeb6ad651930a1c3c98aa because we are now using Java8 and aws-java-sdk v1.11.86 and it was 100% based into this one. Again, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment