Skip to content

Instantly share code, notes, and snippets.

@calebickler
Last active August 1, 2022 16:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save calebickler/899b577088b8e4189dd3a8e3ba07ebaa to your computer and use it in GitHub Desktop.
Save calebickler/899b577088b8e4189dd3a8e3ba07ebaa to your computer and use it in GitHub Desktop.
Subscribing from a dotnet core hosted service to a SQS queue which only supports polling
using Utility.Environment;
using Utility.Queue.Handler;
using Utility.Queue.Subscribers;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Utility.Queue
{
public class QueueSubscriberService<T, U> : BackgroundService where T : IMessageHandler<U>
{
private readonly IServiceProvider ServiceProvider;
private readonly IEnvironmentRequester EnvironmentRequester;
public QueueSubscriber(IServiceProvider serviceProvider,
IEnvironmentRequester environmentRequester)
{
ServiceProvider = serviceProvider;
EnvironmentRequester = environmentRequester;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var subscriber = new SQSSubscriber(EnvironmentRequester, ServiceProvider);
var nameResolver = new NameResolver(EnvironmentRequester);
await subscriber.SetQueueName(nameResolver.GetQueueName<U>());
await subscriber.PollSQS<T, U>(ServiceProvider, stoppingToken);
}
}
}
using Amazon.SQS;
using Amazon.SQS.Model;
using Utility.Environment;
using Utility.Queue.Handler;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Utility.Queue.Subscribers
{
public class SQSQueueSubscriber
{
private readonly IEnvironmentRequester EnvironmentRequester;
private readonly IServiceProvider ServiceProvider;
private readonly string QueueBaseUrl;
private readonly string AccessKey;
private readonly string SecretAccessKey;
private readonly AmazonSQSConfig SQSConfig;
private string QueueUrl;
public SQSSubscriber(IEnvironmentRequester environmentRequester,
IServiceProvider serviceProvider)
{
AccessKey = environmentRequester.GetVariable("AWSAccessKeyId");
SecretAccessKey = environmentRequester.GetVariable("AWSSecretAccessKey");
QueueBaseUrl = environmentRequester.GetVariable("SQSEndpointBase");
SQSConfig = new AmazonSQSConfig
{
// Locally /queue will be added for elasticmq, remove. Live urls will never have /queue.
ServiceURL = QueueBaseUrl.Replace("/queue", "")
};
EnvironmentRequester = environmentRequester;
ServiceProvider = serviceProvider;
}
public async Task SetQueueName(string queueName)
{
QueueUrl = QueueBaseUrl + "/" + queueName;
using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig))
{
CreateQueueRequest createQueueRequest = new CreateQueueRequest
{
QueueName = queueName
};
await client.CreateQueueAsync(createQueueRequest);
}
}
public async Task Poll<T, U>(IServiceProvider serviceProvider, CancellationToken stoppingToken) where T : IMessageHandler<U>
{
using (var client = new AmazonSQSClient(AccessKey, SecretAccessKey, SQSConfig))
{
while (!stoppingToken.IsCancellationRequested)
{
string receiptHandle = "";
try
{
var request = new ReceiveMessageRequest
{
QueueUrl = QueueUrl,
MaxNumberOfMessages = 1,
WaitTimeSeconds = 20
};
var response = await client.ReceiveMessageAsync(request);
if (response.Messages.Count > 0)
{
var sqsMessage = response.Messages.FirstOrDefault();
receiptHandle = sqsMessage.ReceiptHandle;
var caller = new HandlerCaller(EnvironmentRequester, ServiceProvider);
Task task = Task.Run(() => caller.CallHandler<T, U>(sqsMessage.MessageId, sqsMessage.Body));
await client.DeleteMessageAsync(QueueUrl, sqsMessage.ReceiptHandle);
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
if (!string.IsNullOrEmpty(receiptHandle))
{
await client.DeleteMessageAsync(QueueUrl, receiptHandle);
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment