Skip to content

Instantly share code, notes, and snippets.

@sixeyed
Last active September 1, 2020 17:37
Show Gist options
  • Save sixeyed/6941016 to your computer and use it in GitHub Desktop.
Save sixeyed/6941016 to your computer and use it in GitHub Desktop.
Wrappers for the SQS and SNS clients in the AWS SDK for .NET v2
using Amazon;
using Amazon.SQS;
using Amazon.SQS.Model;
using Amazon.SQS.Util;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
namespace Sixeyed.Blogging.Aws
{
public class QueueClient
{
private AmazonSQSClient _sqsClient;
public string QueueName { get; private set; }
internal string QueueUrl { get; private set; }
internal string QueueArn { get; private set; }
private Action<Message> _receiveAction;
private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
public QueueClient(string queueName)
{
_sqsClient = new AmazonSQSClient(RegionEndpoint.EUWest1);
QueueName = queueName;
Ensure();
}
public void Ensure()
{
if (!Exists())
{
var request = new CreateQueueRequest();
request.QueueName = QueueName;
var response = _sqsClient.CreateQueue(request);
QueueUrl = response.QueueUrl;
}
}
public bool Exists()
{
var exists = false;
var queues = _sqsClient.ListQueues();
var matchString = string.Format("/{0}", QueueName);
var matches = queues.QueueUrls.Where(x => x.EndsWith(QueueName));
if (matches.Count() == 1)
{
exists = true;
QueueUrl = matches.ElementAt(0);
PopulateArn();
}
return exists;
}
private void PopulateArn()
{
var attributes = _sqsClient.GetQueueAttributes(new GetQueueAttributesRequest
{
AttributeNames = new List<string>(new string[] {SQSConstants.ATTRIBUTE_QUEUE_ARN}),
QueueUrl = QueueUrl
});
QueueArn = attributes.QueueARN;
}
public void DeleteQueue()
{
var request = new DeleteQueueRequest();
request.QueueUrl = QueueUrl;
_sqsClient.DeleteQueue(request);
}
public void Unsubscribe()
{
_cancellationTokenSource.Cancel();
}
private async void Subscribe()
{
if (!_cancellationTokenSource.IsCancellationRequested)
{
var request = new ReceiveMessageRequest { MaxNumberOfMessages = 10 };
request.QueueUrl = QueueUrl;
var result = await _sqsClient.ReceiveMessageAsync(request, _cancellationTokenSource.Token);
if (result.Messages.Count > 0)
{
foreach (var message in result.Messages)
{
if (_receiveAction != null && message != null)
{
_receiveAction(message);
DeleteMessage(message.ReceiptHandle);
}
}
}
}
if (!_cancellationTokenSource.IsCancellationRequested)
{
Subscribe();
}
}
private DeleteMessageResponse DeleteMessage(string receiptHandle)
{
var request = new DeleteMessageRequest();
request.QueueUrl = QueueUrl;
request.ReceiptHandle = receiptHandle;
return _sqsClient.DeleteMessage(request);
}
public void Subscribe(Action<Message> receiveAction)
{
_receiveAction = receiveAction;
_cancellationTokenSource = new CancellationTokenSource();
Subscribe();
}
public void Send(Message message)
{
var request = new SendMessageRequest();
request.QueueUrl = QueueUrl;
request.MessageBody = message.Body;
_sqsClient.SendMessage(request);
}
internal void AllowSnsToSendMessages(TopicClient topicClient)
{
var policy = SetQueueAttributeRequest.AllowSendFormat.Replace("%QueueArn%", QueueArn).Replace("%TopicArn%", topicClient.TopicArn);
var request = new SetQueueAttributesRequest();
request.Attributes.Add("Policy", policy);
request.QueueUrl = QueueUrl;
var response = _sqsClient.SetQueueAttributes(request);
}
public bool HasMessages()
{
var request = new GetQueueAttributesRequest
{
QueueUrl = QueueUrl,
AttributeNames = new List<string>(new string[] { SQSConstants.ATTRIBUTE_APPROXIMATE_NUMBER_OF_MESSAGES})
};
var response = _sqsClient.GetQueueAttributes(request);
return response.ApproximateNumberOfMessages > 0;
}
public bool IsListening()
{
return !_cancellationTokenSource.IsCancellationRequested;
}
private struct SetQueueAttributeRequest
{
public const string AllowSendFormat =
@"{
""Statement"": [
{
""Sid"": ""MySQSPolicy001"",
""Effect"": ""Allow"",
""Principal"": {
""AWS"": ""*""
},
""Action"": ""sqs:SendMessage"",
""Resource"": ""%QueueArn%"",
""Condition"": {
""ArnEquals"": {
""aws:SourceArn"": ""%TopicArn%""
}
}
}
]
}";
}
}
}
using Amazon;
using Amazon.SimpleNotificationService;
using Amazon.SimpleNotificationService.Model;
using Amazon.SQS.Model;
using System;
using System.Linq;
namespace Sixeyed.Blogging.Aws
{
public class TopicClient
{
private AmazonSimpleNotificationServiceClient _snsClient;
public string TopicName { get; private set; }
public string SubscriptionName { get; private set; }
internal string TopicArn { get; private set; }
private string _subscriptionArn;
private QueueClient _queueClient;
public TopicClient(string topicName) : this(topicName, null) { }
public TopicClient(string topicName, string subscriptionName)
{
_snsClient = new AmazonSimpleNotificationServiceClient(RegionEndpoint.EUWest1);
TopicName = topicName;
SubscriptionName = subscriptionName;
Ensure();
}
private void Ensure()
{
if (!TopicExists())
{
var request = new CreateTopicRequest();
request.Name = TopicName;
var response = _snsClient.CreateTopic(request);
TopicArn = response.TopicArn;
}
if (!string.IsNullOrEmpty(SubscriptionName))
{
_queueClient = new QueueClient(SubscriptionName);
if (!SubscriptionExists())
{
var response = _snsClient.Subscribe(new SubscribeRequest
{
TopicArn = TopicArn,
Protocol = "sqs",
Endpoint = _queueClient.QueueArn
});
_subscriptionArn = response.SubscriptionArn;
var attrRequest = new SetSubscriptionAttributesRequest
{
AttributeName = "RawMessageDelivery",
AttributeValue = "true",
SubscriptionArn = _subscriptionArn
};
_snsClient.SetSubscriptionAttributes(attrRequest);
_queueClient.AllowSnsToSendMessages(this);
}
}
}
private bool TopicExists()
{
var exists = false;
var response = _snsClient.ListTopics();
var matchString = string.Format(":{0}", TopicName);
var matches = response.Topics.Where(x => x.TopicArn.EndsWith(matchString));
if (matches.Count() == 1)
{
exists = true;
TopicArn = matches.ElementAt(0).TopicArn;
}
return exists;
}
private bool SubscriptionExists()
{
var exists = false;
var request = new ListSubscriptionsByTopicRequest {
TopicArn = TopicArn
};
var response = _snsClient.ListSubscriptionsByTopic(request);
var matchString = string.Format(":{0}", SubscriptionName);
var matches = response.Subscriptions.Where(x => x.Endpoint.EndsWith(matchString));
if (matches.Count() == 1)
{
exists = true;
_subscriptionArn = matches.ElementAt(0).SubscriptionArn;
}
return exists;
}
public void Subscribe(Action<Message> receiveAction)
{
_queueClient.Subscribe(receiveAction);
}
public void Unsubscribe()
{
_queueClient.Unsubscribe();
}
public bool IsListening()
{
return _queueClient.IsListening();
}
public bool HasMessages()
{
return _queueClient.HasMessages();
}
public void Publish(Message message)
{
var request = new PublishRequest();
request.TopicArn = TopicArn;
request.Message = message.Body;
var response = _snsClient.Publish(request);
}
public void DeleteSubscription()
{
_queueClient.DeleteQueue();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment