Skip to content

Instantly share code, notes, and snippets.

@sixeyed
Created October 11, 2013 19:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save sixeyed/6940766 to your computer and use it in GitHub Desktop.
Save sixeyed/6940766 to your computer and use it in GitHub Desktop.
A wrapper for the SQS client in the AWS SDK for.NET v2, which uses the message-pump pattern
using Amazon;
using Amazon.SQS;
using Amazon.SQS.Model;
using Amazon.SQS.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Sixeyed.Blogging.Aws
{
public class QueueClient
{
private AmazonSQSClient _sqsClient;
public string QueueName { get; private set; }
internal string QueueUrl { get; private set; }
internal string QueueArn { get; private set; }
private Action<Message> _receiveAction;
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
public QueueClient(string queueName)
{
_sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
QueueName = queueName;
Ensure();
}
public void Ensure()
{
if (!Exists())
{
var request = new CreateQueueRequest();
request.QueueName = QueueName;
var response = _sqsClient.CreateQueue(request);
QueueUrl = response.QueueUrl;
}
}
public bool Exists()
{
var exists = false;
var queues = _sqsClient.ListQueues();
var matchString = string.Format("/{0}", QueueName);
var matches = queues.QueueUrls.Where(x => x.EndsWith(QueueName));
if (matches.Count() == 1)
{
exists = true;
QueueUrl = matches.ElementAt(0);
}
return exists;
}
public void Unsubscribe()
{
_cancellationTokenSource.Cancel();
}
private async void Subscribe()
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 };
request.QueueUrl = QueueUrl;
var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token);
if (result.Messages.Count > 0)
{
foreach (var message in result.Messages)
{
if (_receiveAction != null && message != null)
{
_receiveAction(message);
DeleteMessage(message.ReceiptHandle);
}
}
}
}
if (!_cancellationTokenSource.IsCancellationRequested)
{
Subscribe();
}
}
private DeleteMessageResponse DeleteMessage(string receiptHandle)
{
var request = new DeleteMessageRequest();
request.QueueUrl = QueueUrl;
request.ReceiptHandle = receiptHandle;
return _sqsClient.DeleteMessage(request);
}
public void Subscribe(Action<Message> receiveAction)
{
_receiveAction = receiveAction;
_cancellationTokenSource = new CancellationTokenSource();
Subscribe();
}
public void Send(Message message)
{
var request = new SendMessageRequest();
request.QueueUrl = QueueUrl;
request.MessageBody = message.Body;
_sqsClient.SendMessage(request);
}
public bool HasMessages()
{
var request = new GetQueueAttributesRequest
{
QueueUrl = QueueUrl,
AttributeNames = new List<string>(new string[] { SQSConstants.ATTRIBUTE_APPROXIMATE_NUMBER_OF_MESSAGES})
};
var response = _sqsClient.GetQueueAttributes(request);
return response.ApproximateNumberOfMessages > 0;
}
public bool IsListening()
{
return !_cancellationTokenSource.IsCancellationRequested;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment