Skip to content

Instantly share code, notes, and snippets.

@PeterKneale
Created October 27, 2022 21:58
Show Gist options
  • Save PeterKneale/60e7eab08ab5d8d31845a8633caeb394 to your computer and use it in GitHub Desktop.
Save PeterKneale/60e7eab08ab5d8d31845a8633caeb394 to your computer and use it in GitHub Desktop.
Minimal bus
using Amazon.SimpleNotificationService;
using Amazon.SQS;
using Amazon.SQS.Model;
using Newtonsoft.Json;
namespace BizCover.Web.SupportPortal.Infrastructure;
internal static class Extensions
{
public static void AddBus<THandler>(this IServiceCollection services) where THandler : class, IMessageHandler =>
services
// Register the bus itself used for interacting with AWS SQS and SNS
.AddTransient<IBus, Bus>()
// Register the message handler that will be passed messages as they arrive
.AddTransient<IMessageHandler, THandler>()
// Register the background service polling SQS
.AddHostedService<BusHostedService>()
// Optional, decorate the bus with a logger using scrutor
.Decorate<IBus, BusLogger>();
// Utility extension method to trim a string
internal static string TrimTo(this string s, int maximumLength) =>
s.Length <= maximumLength
? s
: s[..maximumLength];
}
internal interface IBus
{
// Ensures that a SNS topic exists, its name matching the message type. Returns the topic's arn
Task<string> EnsureTopicExists<T>(CancellationToken cancellationToken);
// Ensures that a SQS queue exists, its name matching the service. Returns the queue's url
Task<string> EnsureQueueExists(CancellationToken cancellationToken);
// Ensures that a SQS dead letter queue exists, its name matching the service. Returns the queue's url
Task<string> EnsureDeadLetterQueueExists(CancellationToken cancellationToken);
// Ensure that this service's queue has a subscription the SNS topic matching the type name
Task EnsureSubscriptionExists<T>(CancellationToken cancellationToken);
// Publish a message to a topic, the topics arn was returned when the topics existence was verified
Task PublishToTopic<T>(string topicArn, T message, CancellationToken cancellationToken);
// Receive messages from a queue, the queues url was returned when the queues existence was verified
Task<IReadOnlyCollection<Message>> GetMessagesFromQueue(string queueUrl, CancellationToken cancellationToken);
// Delete a received message from the queue. The queues url was returned when the queues existence was verified. The receipt handle is on the message received from the queue.
Task DeleteMessageFromQueue(string queueUrl, string receiptHandle, CancellationToken cancellationToken);
}
internal interface IMessageHandler
{
Task Handle(Message message, CancellationToken stoppingToken);
}
internal class Bus : IBus
{
private const int MaxTopicNameLength = 256;
private const int MaxQueueNameLength = 80;
private readonly IAmazonSQS _sqs;
private readonly IAmazonSimpleNotificationService _sns;
private readonly string _prefix; // eg: au-dev
private readonly string _service; // eg: discounts
public Bus(IAmazonSQS sqs, IAmazonSimpleNotificationService sns, IConfiguration configuration)
{
_sqs = sqs;
_sns = sns;
_prefix = configuration["bus:prefix"] ?? throw new Exception("Configuration setting 'bus:prefix' is missing");
_service = configuration["bus:service"] ?? throw new Exception("Configuration setting 'bus:service' is missing");
}
public async Task<string> EnsureTopicExists<T>(CancellationToken cancellationToken)
{
var topic = FormatTopicName<T>();
var (exists, arn) = await TopicExists(topic, cancellationToken);
if (exists) return arn!;
var response = await _sns.CreateTopicAsync(topic, cancellationToken);
return response.TopicArn;
}
public async Task<string> EnsureQueueExists(CancellationToken cancellationToken)
{
var name = FormatQueueName();
var (exists, url) = await QueueExists(name, cancellationToken);
if (exists) return url!;
var response = await _sqs.CreateQueueAsync(name, cancellationToken);
return response.QueueUrl;
}
public async Task<string> EnsureDeadLetterQueueExists(CancellationToken cancellationToken)
{
var name = FormatDeadLetterQueueName();
var (exists, url) = await QueueExists(name, cancellationToken);
if (exists) return url!;
var response = await _sqs.CreateQueueAsync(name, cancellationToken);
return response.QueueUrl;
// todo: setup redrive from main queue to this
}
public async Task EnsureSubscriptionExists<T>(CancellationToken cancellationToken)
{
var topicArn = await EnsureTopicExists<T>(cancellationToken);
var queueUrl = await EnsureQueueExists(cancellationToken);
var (exists, _) = await SubscriptionExists(topicArn, queueUrl, cancellationToken);
if (exists) return;
await _sns.SubscribeQueueToTopicsAsync(new List<string> {topicArn}, _sqs, queueUrl);
}
public async Task PublishToTopic<T>(string topicArn, T message, CancellationToken cancellationToken)
{
await _sns.PublishAsync(topicArn, JsonConvert.SerializeObject(message), cancellationToken);
}
public async Task<IReadOnlyCollection<Message>> GetMessagesFromQueue(string queueUrl, CancellationToken cancellationToken)
{
var response = await _sqs.ReceiveMessageAsync(queueUrl, cancellationToken);
return response.Messages;
}
public async Task DeleteMessageFromQueue(string queueUrl, string receiptHandle, CancellationToken cancellationToken) =>
await _sqs.DeleteMessageAsync(queueUrl, receiptHandle, cancellationToken);
private async Task<(bool exists, string? arn)> TopicExists(string name, CancellationToken cancellationToken)
{
var response = await _sns.FindTopicAsync(name);
return string.IsNullOrEmpty(response.TopicArn)
? (false, null)
: (true, response.TopicArn);
}
private async Task<(bool exists, string? url)> QueueExists(string name, CancellationToken cancellationToken)
{
var response = await _sqs.GetQueueUrlAsync(name, cancellationToken);
return string.IsNullOrEmpty(response.QueueUrl)
? (false, null)
: (true, response.QueueUrl);
}
private async Task<(bool exists, string? arn)> SubscriptionExists(string topic, string queueUrl, CancellationToken cancellationToken)
{
// limited to 100
var response = await _sns.ListSubscriptionsByTopicAsync(topic, cancellationToken);
var subscription = response.Subscriptions.SingleOrDefault(x => x.Endpoint == queueUrl);
return subscription == null
? (false, null)
: (true, subscription.SubscriptionArn);
}
// {prefix}-{service}-{type}
internal string FormatTopicName<T>() =>
$"{_prefix}-{_service}-{typeof(T).Name}"
.ToLowerInvariant()
.TrimTo(MaxTopicNameLength);
// {prefix}-{service}
internal string FormatQueueName() =>
$"{_prefix}-{_service}"
.ToLowerInvariant()
.TrimTo(MaxQueueNameLength);
// {prefix}-{service}-dlq
internal string FormatDeadLetterQueueName() =>
$"{_prefix}-{_service}-dlq"
.ToLowerInvariant()
.TrimTo(MaxQueueNameLength);
}
internal class BusHostedService : BackgroundService
{
private readonly IBus _bus;
private readonly IServiceProvider _provider;
private readonly ILogger<BusHostedService> _log;
private readonly TimeSpan _queueErrorDelay = TimeSpan.FromSeconds(5);
private readonly TimeSpan _queueEmptyDelay = TimeSpan.FromSeconds(5);
public BusHostedService(IBus bus, IServiceProvider provider, ILogger<BusHostedService> log)
{
_bus = bus;
_provider = provider;
_log = log;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
_log.LogInformation("Background service starting");
var queueUrl = await _bus.EnsureQueueExists(stoppingToken);
_log.LogInformation($"Background service processing message from {queueUrl}");
await ProcessQueue(queueUrl, stoppingToken);
}
catch (Exception e)
{
_log.LogError(e, $"Background service has encountered an error and will restart in {_queueErrorDelay}");
await Task.Delay(_queueErrorDelay, stoppingToken);
}
}
}
private async Task ProcessQueue(string queueUrl, CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var messages = await _bus.GetMessagesFromQueue(queueUrl, stoppingToken);
if (messages.Any())
{
foreach (var message in messages)
{
_log.LogInformation("Background service processing message {message}", message.MessageId);
using var scope = _provider.CreateScope();
var handler = scope.ServiceProvider.GetService<IMessageHandler>();
if (handler == null)
{
throw new Exception("No message handler has been registered");
}
await handler.Handle(message, stoppingToken);
await _bus.DeleteMessageFromQueue(queueUrl, message.ReceiptHandle, stoppingToken);
}
}
else
{
await Task.Delay(_queueEmptyDelay, stoppingToken);
}
}
}
}
internal class BusLogger : IBus
{
private readonly IBus _inner;
private readonly ILogger<BusLogger> _log;
public BusLogger(IBus inner, ILogger<BusLogger> log)
{
_inner = inner;
_log = log;
}
public async Task<string> EnsureTopicExists<T>(CancellationToken cancellationToken)
{
_log.LogInformation($"Ensuring a sns topic exists for {typeof(T).Name}");
return await _inner.EnsureTopicExists<T>(cancellationToken);
}
public async Task<string> EnsureQueueExists(CancellationToken cancellationToken)
{
_log.LogInformation("Ensuring a sqs queue exists for this service");
return await _inner.EnsureQueueExists(cancellationToken);
}
public async Task<string> EnsureDeadLetterQueueExists(CancellationToken cancellationToken)
{
_log.LogInformation("Ensuring a sqs dead letter queue exists for this service");
return await _inner.EnsureDeadLetterQueueExists(cancellationToken);
}
public async Task EnsureSubscriptionExists<T>(CancellationToken cancellationToken)
{
_log.LogInformation($"Ensuring a sns subscription exists for this sns topic {typeof(T).Name} to this service");
await _inner.EnsureSubscriptionExists<T>(cancellationToken);
}
public async Task PublishToTopic<T>(string topicArn, T message, CancellationToken cancellationToken)
{
_log.LogInformation($"Publishing to {typeof(T).Name} a message of type {typeof(T).Name}");
await _inner.PublishToTopic(topicArn, message, cancellationToken);
}
public async Task<IReadOnlyCollection<Message>> GetMessagesFromQueue(string queueUrl, CancellationToken cancellationToken)
{
_log.LogInformation($"Getting message from sqs queue {queueUrl}");
var messages = await _inner.GetMessagesFromQueue(queueUrl, cancellationToken);
_log.LogInformation($"Received message from sqs queue {messages.Count}");
return messages;
}
public async Task DeleteMessageFromQueue(string queueUrl, string receiptHandle, CancellationToken cancellationToken)
{
_log.LogInformation($"Deleting message {receiptHandle} from sqs queue {queueUrl}");
await _inner.DeleteMessageFromQueue(queueUrl, receiptHandle, cancellationToken);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment