Skip to content

Instantly share code, notes, and snippets.

@UnquietCode
Last active October 1, 2022 04:00
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save UnquietCode/5717942 to your computer and use it in GitHub Desktop.
Save UnquietCode/5717942 to your computer and use it in GitHub Desktop.
Mock AWS SQS implementation which operatesin-memory rather than hitting the real SQS.
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.ResponseMetadata;
import com.amazonaws.regions.Region;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.*;
import com.google.common.hash.Hashing;
import java.util.*;
import java.util.concurrent.*;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
/**
* Sitting in the airport, unable to connect to the internet, this seemed
* like a good use of my time, at least as compared with sleeping.
*
* @author Ben Fagin
* @version 2013-05-28
*/
public class MockSQS implements AmazonSQS {
private final Map<String, Queue<MessageInfo>> queues = new HashMap<>();
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
private int timeout = 35*60;
private final Map<String, ScheduledMessage> receivedMessages = new HashMap<>();
/*
- adds a message to the correct queue
- delays if required
*/
@Override
public SendMessageResult sendMessage(final SendMessageRequest request) throws AmazonServiceException, AmazonClientException {
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl());
final MessageInfo info = new MessageInfo();
info.body = checkNotNull(request.getMessageBody());
info.id = UUID.randomUUID().toString();
if (request.getDelaySeconds() == null) {
queue.add(info);
} else {
Runnable task = new Runnable() {
public void run() {
queue.add(info);
}
};
executor.schedule(task, request.getDelaySeconds(), TimeUnit.SECONDS);
}
return new SendMessageResult().withMessageId(info.id).withMD5OfMessageBody(info.hash());
}
/*
- takes messages off the queue
- if timeout, then they are added back
*/
@Override
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest request) throws AmazonServiceException, AmazonClientException {
final Queue<MessageInfo> queue = getOrCreateQueue(request.getQueueUrl());
List<Message> messages = new ArrayList<>();
Integer max = request.getMaxNumberOfMessages();
if (max == null) { max = 0; }
checkArgument(max <= 10 && max > 0);
Integer visibilityTimeout = request.getVisibilityTimeout();
if (visibilityTimeout == null) { visibilityTimeout = timeout; }
for (int i=0; i < max; ++i) {
final MessageInfo info = queue.poll();
if (info != null) {
final String receiptHandle = UUID.randomUUID().toString();
Message message = new Message();
message.setBody(info.body);
message.setMessageId(info.id);
message.setMD5OfBody(info.hash());
message.setReceiptHandle(receiptHandle);
messages.add(message);
Runnable command = new Runnable() {
public void run() {
queue.add(info);
receivedMessages.remove(receiptHandle);
}
};
ScheduledMessage scheduled = new ScheduledMessage();
scheduled.future = executor.schedule(command, visibilityTimeout, TimeUnit.SECONDS);
scheduled.runnable = command;
receivedMessages.put(message.getReceiptHandle(), scheduled);
}
}
return new ReceiveMessageResult().withMessages(messages);
}
/*
- deletes the task which would have re-added a message to the queue,
effectively deleting the message
*/
@Override
public void deleteMessage(DeleteMessageRequest request) throws AmazonServiceException, AmazonClientException {
ScheduledMessage scheduled = receivedMessages.remove(request.getReceiptHandle());
if (scheduled == null) {
throw new RuntimeException("message does not exist");
}
scheduled.future.cancel(true);
}
@Override
public void changeMessageVisibility(ChangeMessageVisibilityRequest request) throws AmazonServiceException, AmazonClientException {
ScheduledMessage scheduled = receivedMessages.get(request.getReceiptHandle());
if (scheduled == null) {
throw new RuntimeException("message does not exist");
}
scheduled.future.cancel(true);
scheduled.future = executor.schedule(scheduled.runnable, checkNotNull(request.getVisibilityTimeout()), TimeUnit.SECONDS);
}
@Override
public void shutdown() {
executor.shutdown();
receivedMessages.clear();
queues.clear();
}
private static class MessageInfo {
String body;
String id;
String hash() {
return Hashing.md5().hashString(body).toString();
}
}
private static class ScheduledMessage {
ScheduledFuture future;
Runnable runnable;
}
private Queue<MessageInfo> getOrCreateQueue(String url) {
Queue<MessageInfo> queue = queues.get(checkNotNull(url));
if (queue == null) {
synchronized (queues) {
queue = queues.get(checkNotNull(url));
if (queue == null) {
queue = new ArrayDeque<>();
queues.put(url, queue);
}
}
}
return queue;
}
/*
- set the default timeout when receiving messages from the queue
*/
public void setTimeout(int timeout) {
this.timeout = timeout;
}
//---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---o---//
@Override
public void setEndpoint(String endpoint) throws IllegalArgumentException {
throw new RuntimeException("not implemented");
}
@Override
public void setRegion(Region region) throws IllegalArgumentException {
throw new RuntimeException("not implemented");
}
@Override
public void setQueueAttributes(SetQueueAttributesRequest setQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public void removePermission(RemovePermissionRequest removePermissionRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public GetQueueAttributesResult getQueueAttributes(GetQueueAttributesRequest getQueueAttributesRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessageBatchRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public void deleteQueue(DeleteQueueRequest deleteQueueRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ListQueuesResult listQueues(ListQueuesRequest listQueuesRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public void addPermission(AddPermissionRequest addPermissionRequest) throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ListQueuesResult listQueues() throws AmazonServiceException, AmazonClientException {
throw new RuntimeException("not implemented");
}
@Override
public ResponseMetadata getCachedResponseMetadata(AmazonWebServiceRequest request) {
throw new RuntimeException("not implemented");
}
}
@hulbert
Copy link

hulbert commented Mar 19, 2016

@UnquietCode this is pretty cool. Do you know if Amazon's actual SQS implementation re-enqueues messages back to the end or head of the queue when they haven't been deleted after the visibility timeout expires? (You're using queue#add https://gist.github.com/UnquietCode/5717942#file-mocksqs-java-L85-L90 but I would have thought they add to the head but can't find any concrete answer).

@UnquietCode
Copy link
Author

Probably they do end up back at the head of the queue, or that the queue is ordered by time of arrival. Here that would mean using something like a dequeue, but for simplicity I've chosen to use a Queue instance instead.

@orvyl
Copy link

orvyl commented Feb 28, 2017

Hi!

Just wanted to say THANK YOU VERY MUCH for this one. It really helps us big time. I created our own implementation https://gist.github.com/orvyl/870fba7ddbaeeb6ad651930a1c3c98aa because we are now using Java8 and aws-java-sdk v1.11.86 and it was 100% based into this one. Again, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment