Last active
October 10, 2023 18:18
-
-
Save maximgorbatyuk/b772cb40d5bea823be8dc828e7e4ac27 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"MessageBroker": { | |
"Connection": "Endpoint=sb://yournamespace.windows.net/;SharedAccessKeyName=email;SharedAccessKey=awesomesecret", | |
"EmailMessageTopic": "email-message-queue", | |
"HealthCheckConnection": "Endpoint=sb://yournamespace.windows.net/;SharedAccessKeyName=healthcheck;SharedAccessKey=awesomesecret", | |
"HealthCheckTopic": "azuretopic" | |
}, | |
"UseInMemoryMessageBroker": true, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using Azure.Messaging.ServiceBus; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Logging; | |
using Services.Emails; | |
using Services.Infrastructure.Azure; | |
using Utils.ValueObjects; | |
namespace YourNamespace | |
{ | |
// https://gist.github.com/danielhunex/c1cfed093396b6b43ec927c63f3540af | |
public class AzureBrokerEmailConsumerBackService : AzureBusTopicConsumerBase | |
{ | |
public AzureBrokerEmailConsumerBackService( | |
ILogger<AzureBrokerEmailConsumerBackService> logger, | |
IServiceScopeFactory scopeFactory, | |
MessageBrokerSettings brokerSettings) | |
: base( | |
logger, | |
scopeFactory, | |
brokerSettings) | |
{ | |
} | |
// handle received messages | |
protected override NonNullableString MessageTopic => BrokerSettings.EmailMessageTopic; | |
protected override Task MessageHandleInternalAsync(IServiceProvider provider, ServiceBusReceivedMessage message) | |
{ | |
string body = message.Body.ToString(); | |
var email = provider.GetRequiredService<IEmail>(); | |
return email.SendAsync(body); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Azure.Messaging.ServiceBus; | |
using Microsoft.Extensions.DependencyInjection; | |
using Microsoft.Extensions.Hosting; | |
using Microsoft.Extensions.Logging; | |
using Services.Infrastructure.Azure; | |
using Utils.ValueObjects; | |
namespace YourNamespace | |
{ | |
public abstract class AzureBusTopicConsumerBase : BackgroundService | |
{ | |
private const int DefaultDelay = 3000; | |
private readonly ILogger _logger; | |
protected IServiceScopeFactory ScopeFactory { get; } | |
protected MessageBrokerSettings BrokerSettings { get; } | |
protected abstract NonNullableString MessageTopic { get; } | |
protected AzureBusTopicConsumerBase( | |
ILogger logger, IServiceScopeFactory scopeFactory, MessageBrokerSettings brokerSettings) | |
{ | |
_logger = logger; | |
ScopeFactory = scopeFactory; | |
BrokerSettings = brokerSettings; | |
} | |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) | |
{ | |
await using var client = new ServiceBusClient(BrokerSettings.Connection); | |
await using var processor = client.CreateProcessor( | |
queueName: MessageTopic, | |
options: new ServiceBusProcessorOptions()); | |
processor.ProcessMessageAsync += MessageHandlerAsync; | |
processor.ProcessErrorAsync += ErrorHandlerAsync; | |
_logger.LogInformation("Starting consuming"); | |
await processor.StartProcessingAsync(stoppingToken); | |
while (!stoppingToken.IsCancellationRequested) | |
{ | |
await Task.Delay(DefaultDelay, stoppingToken); | |
} | |
_logger.LogInformation("Stopping consuming..."); | |
await processor.StopProcessingAsync(stoppingToken); | |
} | |
protected abstract Task MessageHandleInternalAsync( | |
IServiceProvider provider, ServiceBusReceivedMessage message); | |
// handle received messages | |
private async Task MessageHandlerAsync(ProcessMessageEventArgs args) | |
{ | |
_logger.LogInformation("Received email message"); | |
using var scope = ScopeFactory.CreateScope(); | |
await MessageHandleInternalAsync(scope.ServiceProvider, args.Message); | |
// complete the message. messages is deleted from the queue. | |
await args.CompleteMessageAsync(args.Message); | |
} | |
// handle any errors when receiving messages | |
private Task ErrorHandlerAsync(ProcessErrorEventArgs args) | |
{ | |
_logger.LogError(args.Exception, args.Exception.ToString()); | |
return Task.CompletedTask; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
using Azure.Messaging.ServiceBus; | |
using Microsoft.Extensions.Logging; | |
using Newtonsoft.Json; | |
using Services.Infrastructure.Azure; | |
using Web.Infrastructure.MessageBrokers; | |
namespace YourNamespace | |
{ | |
public class AzureServiceBusPublisher : BrokerPublisherBase | |
{ | |
private readonly MessageBrokerSettings _config; | |
public AzureServiceBusPublisher(MessageBrokerSettings configuration, ILogger<AzureServiceBusPublisher> logger) | |
: base(logger) | |
{ | |
_config = configuration; | |
} | |
protected override async Task PublishInternalAsync<T>(string topicName, T message) | |
{ | |
// create a Service Bus client | |
await using var client = new ServiceBusClient(_config.Connection.ToString()); | |
ServiceBusSender sender = client.CreateSender(topicName); | |
// create a message that we can send | |
// send the message | |
await sender.SendMessageAsync( | |
new ServiceBusMessage(JsonConvert.SerializeObject(message))); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace YourNamespace | |
{ | |
public record Bool | |
{ | |
private readonly string _source; | |
private bool? _value; | |
/// <summary> | |
/// Null is allowed. | |
/// </summary> | |
/// <param name="source">Source.</param> | |
public Bool(string source) | |
{ | |
_source = source; | |
} | |
public bool ToBool() | |
{ | |
_value ??= _source != null && bool.TryParse(_source, out var result) && result; | |
return _value.Value; | |
} | |
public bool Equals(bool second) => second == ToBool(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
using Microsoft.Extensions.Logging; | |
using Services.Infrastructure.Azure; | |
using Utils.Helpers; | |
namespace YourNamespace | |
{ | |
public abstract class BrokerPublisherBase : IMessageBroker | |
{ | |
private readonly ILogger _logger; | |
protected BrokerPublisherBase(ILogger logger) | |
{ | |
_logger = logger; | |
} | |
protected abstract Task PublishInternalAsync<T>(string topicName, T message) | |
where T : class; | |
public async Task PublishAsync<T>(string topicName, T message) | |
where T : class | |
{ | |
topicName.ThrowIfNullOrEmpty(nameof(topicName)); | |
message.ThrowIfNull(nameof(message)); | |
try | |
{ | |
_logger.LogDebug($"Publishing message {message}"); | |
await PublishInternalAsync(topicName, message); | |
_logger.LogDebug($"Sent a single message to the queue: {message}"); | |
} | |
catch | |
{ | |
_logger.LogError($"The message {message} was not published due to error"); | |
throw; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Threading.Tasks; | |
using MassTransit; | |
using Microsoft.Extensions.Logging; | |
using Services; | |
namespace YourNamespace | |
{ | |
public abstract class ConsumerBase<T> : IConsumer<T> | |
where T : class | |
{ | |
protected ConsumerBase(ILogger logger) | |
{ | |
Logger = logger; | |
} | |
protected ILogger Logger { get; } | |
protected abstract Task ConsumeAsync(ConsumeContext<T> context); | |
#pragma warning disable UseAsyncSuffix // Use Async suffix | |
public async Task Consume(ConsumeContext<T> context) | |
#pragma warning restore UseAsyncSuffix // Use Async suffix | |
{ | |
var messageType = typeof(T).Name; | |
try | |
{ | |
Logger.LogInformation($"Consuming {messageType}"); | |
await ConsumeAsync(context); | |
Logger.LogInformation($"Consumed {messageType}"); | |
} | |
catch (Exception e) | |
{ | |
Logger.LogError( | |
eventId: EventIdFactory.ConsumerError, | |
exception: e, | |
message: $"An error during consuming {messageType}:{Environment.NewLine}{e.Message}"); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
namespace YourNamespace | |
{ | |
public interface IMessageBroker | |
{ | |
Task PublishAsync<T>(string topicName, T message) | |
where T : class; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
using MassTransit; | |
using Microsoft.Extensions.Logging; | |
using Web.Infrastructure.MessageBrokers; | |
namespace YourNamespace | |
{ | |
public class InMemoryBrokerPublisher : BrokerPublisherBase | |
{ | |
private readonly IPublishEndpoint _publish; | |
public InMemoryBrokerPublisher(IPublishEndpoint publish, ILogger<InMemoryBrokerPublisher> logger) | |
: base(logger) | |
{ | |
_publish = publish; | |
} | |
protected override Task PublishInternalAsync<T>(string topicName, T message) | |
{ | |
return _publish.Publish(message); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
using MassTransit; | |
using Microsoft.Extensions.Logging; | |
using Services.Emails; | |
using Services.ServiceBusMessages; | |
namespace YourNamespace | |
{ | |
public class MassTransitEmailSendConsumer : ConsumerBase<EmailMessage> | |
{ | |
private readonly IEmail _email; | |
protected override async Task ConsumeAsync(ConsumeContext<EmailMessage> context) | |
{ | |
await _email.SendAsync(context.Message); | |
Logger.LogDebug("Email sent"); | |
} | |
public MassTransitEmailSendConsumer(ILogger<MassTransitEmailSendConsumer> logger, IEmail email) | |
: base(logger) | |
{ | |
_email = email; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using MassTransit; | |
using Microsoft.Extensions.Configuration; | |
using Microsoft.Extensions.DependencyInjection; | |
using Services.Infrastructure.Azure; | |
using Utils.ValueObjects; | |
using Web.Azure.ServiceBus; | |
using Web.Infrastructure.MassTransitConsumers; | |
namespace YourNamespace | |
{ | |
public class MessageBrokerConfig | |
{ | |
private readonly IServiceCollection _services; | |
private readonly Bool _useInMemoryMb; | |
private readonly MessageBrokerSettings _configuration; | |
public MessageBrokerConfig( | |
IServiceCollection services, | |
IConfiguration configuration) | |
{ | |
_services = services; | |
_useInMemoryMb = new Bool(configuration["UseInMemoryMessageBroker"]); | |
_configuration = new MessageBrokerSettings(configuration); | |
} | |
public MessageBrokerConfig Setup() | |
{ | |
_services.AddTransient<MessageBrokerSettings>(); | |
return _useInMemoryMb.ToBool() ? InMemoryMb() : AzureServiceBus(); | |
} | |
private MessageBrokerConfig InMemoryMb() | |
{ | |
_services.AddMassTransit(x => | |
{ | |
x.AddConsumer<MassTransitEmailSendConsumer>(); | |
x.UsingInMemory((context, cfg) => | |
{ | |
cfg.TransportConcurrencyLimit = 100; | |
cfg.ConfigureEndpoints(context); | |
cfg.ReceiveEndpoint(_configuration.EmailMessageTopic.ToString(), e => | |
{ | |
e.ConfigureConsumer<MassTransitEmailSendConsumer>(context); | |
}); | |
}); | |
}); | |
_services.AddMassTransitHostedService(); | |
_services.AddScoped<IMessageBroker, InMemoryBrokerPublisher>(); | |
return this; | |
} | |
private MessageBrokerConfig AzureServiceBus() | |
{ | |
_services.AddHostedService<AzureBrokerEmailConsumerBackService>(); | |
_services.AddScoped<IMessageBroker, AzureServiceBusPublisher>(); | |
_services | |
.AddHealthChecks() | |
.AddAzureServiceBusTopic( | |
connectionString: _configuration.HealthCheckConnection.ToString(), | |
topicName: _configuration.HealthCheckTopic.ToString()); | |
return this; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Extensions.Configuration; | |
using Utils.ValueObjects; | |
namespace YourNamespace | |
{ | |
public class MessageBrokerSettings | |
{ | |
public NonNullableString Connection { get; } | |
public NonNullableString EmailMessageTopic { get; } | |
public NonNullableString HealthCheckConnection { get; } | |
public NonNullableString HealthCheckTopic { get; } | |
public MessageBrokerSettings(IConfiguration configuration) | |
{ | |
var section = configuration.GetSection("Azure").GetSection("ServiceBus"); | |
Connection = new NonNullableString(section[nameof(Connection)]); | |
EmailMessageTopic = new NonNullableString(section[nameof(EmailMessageTopic)]); | |
HealthCheckConnection = new NonNullableString(section[nameof(HealthCheckConnection)]); | |
HealthCheckTopic = new NonNullableString(section[nameof(HealthCheckTopic)]); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace YourNamespace | |
{ | |
public record NonNullableString : NonNullable<string> | |
{ | |
public NonNullableString(string value) | |
: base(value) | |
{ | |
} | |
public NonNullableString(string value, string paramName) | |
: base(value, paramName) | |
{ | |
} | |
public static explicit operator string(NonNullableString nnString) | |
{ | |
return nnString.Value(); | |
} | |
public override string ToString() | |
{ | |
return Value(); | |
} | |
} | |
public record NonNullable<T> | |
where T : class | |
{ | |
private readonly T _value; | |
private readonly string _paramName; | |
public NonNullable(T value) | |
: this(value, null) | |
{ | |
} | |
public NonNullable(T value, string paramName) | |
{ | |
_value = value; | |
_paramName = paramName; | |
} | |
public T Value() | |
{ | |
if (_value != null) | |
{ | |
return _value; | |
} | |
var message = _paramName != null | |
? $"The variable '{_paramName}' is null" | |
: "The passed value is null"; | |
throw new InvalidOperationException(message); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This method gets called by the runtime. Use this method to add services to the container. | |
public void ConfigureServices(IServiceCollection services) | |
{ | |
// code goes here | |
// code goes here | |
// code goes here | |
new MessageBrokerConfig(services, Configuration).Setup(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment