Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
"MessageBroker": {
"Connection": "Endpoint=sb://yournamespace.windows.net/;SharedAccessKeyName=email;SharedAccessKey=awesomesecret",
"EmailMessageTopic": "email-message-queue",
"HealthCheckConnection": "Endpoint=sb://yournamespace.windows.net/;SharedAccessKeyName=healthcheck;SharedAccessKey=awesomesecret",
"HealthCheckTopic": "azuretopic"
},
"UseInMemoryMessageBroker": true,
using System;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Services.Emails;
using Services.Infrastructure.Azure;
using Utils.ValueObjects;
namespace YourNamespace
{
// https://gist.github.com/danielhunex/c1cfed093396b6b43ec927c63f3540af
public class AzureBrokerEmailConsumerBackService : AzureBusTopicConsumerBase
{
public AzureBrokerEmailConsumerBackService(
ILogger<AzureBrokerEmailConsumerBackService> logger,
IServiceScopeFactory scopeFactory,
MessageBrokerSettings brokerSettings)
: base(
logger,
scopeFactory,
brokerSettings)
{
}
// handle received messages
protected override NonNullableString MessageTopic => BrokerSettings.EmailMessageTopic;
protected override Task MessageHandleInternalAsync(IServiceProvider provider, ServiceBusReceivedMessage message)
{
string body = message.Body.ToString();
var email = provider.GetRequiredService<IEmail>();
return email.SendAsync(body);
}
}
}
using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Services.Infrastructure.Azure;
using Utils.ValueObjects;
namespace YourNamespace
{
public abstract class AzureBusTopicConsumerBase : BackgroundService
{
private const int DefaultDelay = 3000;
private readonly ILogger _logger;
protected IServiceScopeFactory ScopeFactory { get; }
protected MessageBrokerSettings BrokerSettings { get; }
protected abstract NonNullableString MessageTopic { get; }
protected AzureBusTopicConsumerBase(
ILogger logger, IServiceScopeFactory scopeFactory, MessageBrokerSettings brokerSettings)
{
_logger = logger;
ScopeFactory = scopeFactory;
BrokerSettings = brokerSettings;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await using var client = new ServiceBusClient(BrokerSettings.Connection);
await using var processor = client.CreateProcessor(
queueName: MessageTopic,
options: new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandlerAsync;
processor.ProcessErrorAsync += ErrorHandlerAsync;
_logger.LogInformation("Starting consuming");
await processor.StartProcessingAsync(stoppingToken);
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(DefaultDelay, stoppingToken);
}
_logger.LogInformation("Stopping consuming...");
await processor.StopProcessingAsync(stoppingToken);
}
protected abstract Task MessageHandleInternalAsync(
IServiceProvider provider, ServiceBusReceivedMessage message);
// handle received messages
private async Task MessageHandlerAsync(ProcessMessageEventArgs args)
{
_logger.LogInformation("Received email message");
using var scope = ScopeFactory.CreateScope();
await MessageHandleInternalAsync(scope.ServiceProvider, args.Message);
// complete the message. messages is deleted from the queue.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
private Task ErrorHandlerAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, args.Exception.ToString());
return Task.CompletedTask;
}
}
}
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Services.Infrastructure.Azure;
using Web.Infrastructure.MessageBrokers;
namespace YourNamespace
{
public class AzureServiceBusPublisher : BrokerPublisherBase
{
private readonly MessageBrokerSettings _config;
public AzureServiceBusPublisher(MessageBrokerSettings configuration, ILogger<AzureServiceBusPublisher> logger)
: base(logger)
{
_config = configuration;
}
protected override async Task PublishInternalAsync<T>(string topicName, T message)
{
// create a Service Bus client
await using var client = new ServiceBusClient(_config.Connection.ToString());
ServiceBusSender sender = client.CreateSender(topicName);
// create a message that we can send
// send the message
await sender.SendMessageAsync(
new ServiceBusMessage(JsonConvert.SerializeObject(message)));
}
}
}
namespace YourNamespace
{
public record Bool
{
private readonly string _source;
private bool? _value;
/// <summary>
/// Null is allowed.
/// </summary>
/// <param name="source">Source.</param>
public Bool(string source)
{
_source = source;
}
public bool ToBool()
{
_value ??= _source != null && bool.TryParse(_source, out var result) && result;
return _value.Value;
}
public bool Equals(bool second) => second == ToBool();
}
}
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Services.Infrastructure.Azure;
using Utils.Helpers;
namespace YourNamespace
{
public abstract class BrokerPublisherBase : IMessageBroker
{
private readonly ILogger _logger;
protected BrokerPublisherBase(ILogger logger)
{
_logger = logger;
}
protected abstract Task PublishInternalAsync<T>(string topicName, T message)
where T : class;
public async Task PublishAsync<T>(string topicName, T message)
where T : class
{
topicName.ThrowIfNullOrEmpty(nameof(topicName));
message.ThrowIfNull(nameof(message));
try
{
_logger.LogDebug($"Publishing message {message}");
await PublishInternalAsync(topicName, message);
_logger.LogDebug($"Sent a single message to the queue: {message}");
}
catch
{
_logger.LogError($"The message {message} was not published due to error");
throw;
}
}
}
}
using System;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Logging;
using Services;
namespace YourNamespace
{
public abstract class ConsumerBase<T> : IConsumer<T>
where T : class
{
protected ConsumerBase(ILogger logger)
{
Logger = logger;
}
protected ILogger Logger { get; }
protected abstract Task ConsumeAsync(ConsumeContext<T> context);
#pragma warning disable UseAsyncSuffix // Use Async suffix
public async Task Consume(ConsumeContext<T> context)
#pragma warning restore UseAsyncSuffix // Use Async suffix
{
var messageType = typeof(T).Name;
try
{
Logger.LogInformation($"Consuming {messageType}");
await ConsumeAsync(context);
Logger.LogInformation($"Consumed {messageType}");
}
catch (Exception e)
{
Logger.LogError(
eventId: EventIdFactory.ConsumerError,
exception: e,
message: $"An error during consuming {messageType}:{Environment.NewLine}{e.Message}");
}
}
}
}
using System.Threading.Tasks;
namespace YourNamespace
{
public interface IMessageBroker
{
Task PublishAsync<T>(string topicName, T message)
where T : class;
}
}
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Logging;
using Web.Infrastructure.MessageBrokers;
namespace YourNamespace
{
public class InMemoryBrokerPublisher : BrokerPublisherBase
{
private readonly IPublishEndpoint _publish;
public InMemoryBrokerPublisher(IPublishEndpoint publish, ILogger<InMemoryBrokerPublisher> logger)
: base(logger)
{
_publish = publish;
}
protected override Task PublishInternalAsync<T>(string topicName, T message)
{
return _publish.Publish(message);
}
}
}
using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Logging;
using Services.Emails;
using Services.ServiceBusMessages;
namespace YourNamespace
{
public class MassTransitEmailSendConsumer : ConsumerBase<EmailMessage>
{
private readonly IEmail _email;
protected override async Task ConsumeAsync(ConsumeContext<EmailMessage> context)
{
await _email.SendAsync(context.Message);
Logger.LogDebug("Email sent");
}
public MassTransitEmailSendConsumer(ILogger<MassTransitEmailSendConsumer> logger, IEmail email)
: base(logger)
{
_email = email;
}
}
}
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Services.Infrastructure.Azure;
using Utils.ValueObjects;
using Web.Azure.ServiceBus;
using Web.Infrastructure.MassTransitConsumers;
namespace YourNamespace
{
public class MessageBrokerConfig
{
private readonly IServiceCollection _services;
private readonly Bool _useInMemoryMb;
private readonly MessageBrokerSettings _configuration;
public MessageBrokerConfig(
IServiceCollection services,
IConfiguration configuration)
{
_services = services;
_useInMemoryMb = new Bool(configuration["UseInMemoryMessageBroker"]);
_configuration = new MessageBrokerSettings(configuration);
}
public MessageBrokerConfig Setup()
{
_services.AddTransient<MessageBrokerSettings>();
return _useInMemoryMb.ToBool() ? InMemoryMb() : AzureServiceBus();
}
private MessageBrokerConfig InMemoryMb()
{
_services.AddMassTransit(x =>
{
x.AddConsumer<MassTransitEmailSendConsumer>();
x.UsingInMemory((context, cfg) =>
{
cfg.TransportConcurrencyLimit = 100;
cfg.ConfigureEndpoints(context);
cfg.ReceiveEndpoint(_configuration.EmailMessageTopic.ToString(), e =>
{
e.ConfigureConsumer<MassTransitEmailSendConsumer>(context);
});
});
});
_services.AddMassTransitHostedService();
_services.AddScoped<IMessageBroker, InMemoryBrokerPublisher>();
return this;
}
private MessageBrokerConfig AzureServiceBus()
{
_services.AddHostedService<AzureBrokerEmailConsumerBackService>();
_services.AddScoped<IMessageBroker, AzureServiceBusPublisher>();
_services
.AddHealthChecks()
.AddAzureServiceBusTopic(
connectionString: _configuration.HealthCheckConnection.ToString(),
topicName: _configuration.HealthCheckTopic.ToString());
return this;
}
}
}
using Microsoft.Extensions.Configuration;
using Utils.ValueObjects;
namespace YourNamespace
{
public class MessageBrokerSettings
{
public NonNullableString Connection { get; }
public NonNullableString EmailMessageTopic { get; }
public NonNullableString HealthCheckConnection { get; }
public NonNullableString HealthCheckTopic { get; }
public MessageBrokerSettings(IConfiguration configuration)
{
var section = configuration.GetSection("Azure").GetSection("ServiceBus");
Connection = new NonNullableString(section[nameof(Connection)]);
EmailMessageTopic = new NonNullableString(section[nameof(EmailMessageTopic)]);
HealthCheckConnection = new NonNullableString(section[nameof(HealthCheckConnection)]);
HealthCheckTopic = new NonNullableString(section[nameof(HealthCheckTopic)]);
}
}
}
using System;
namespace YourNamespace
{
public record NonNullableString : NonNullable<string>
{
public NonNullableString(string value)
: base(value)
{
}
public NonNullableString(string value, string paramName)
: base(value, paramName)
{
}
public static explicit operator string(NonNullableString nnString)
{
return nnString.Value();
}
public override string ToString()
{
return Value();
}
}
public record NonNullable<T>
where T : class
{
private readonly T _value;
private readonly string _paramName;
public NonNullable(T value)
: this(value, null)
{
}
public NonNullable(T value, string paramName)
{
_value = value;
_paramName = paramName;
}
public T Value()
{
if (_value != null)
{
return _value;
}
var message = _paramName != null
? $"The variable '{_paramName}' is null"
: "The passed value is null";
throw new InvalidOperationException(message);
}
}
}
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
// code goes here
// code goes here
// code goes here
new MessageBrokerConfig(services, Configuration).Setup();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment