Skip to content

Instantly share code, notes, and snippets.

@djeikyb
Created October 16, 2023 23:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djeikyb/32d7882bca19334fb047d1993d514b25 to your computer and use it in GitHub Desktop.
Save djeikyb/32d7882bca19334fb047d1993d514b25 to your computer and use it in GitHub Desktop.
Rabbitmq, MassTransit, & the dotnet generic host in a cli program.
using System.ComponentModel.DataAnnotations;
using System.Net.Mime;
using System.Text;
using MassTransit;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Poc.MassTransit;
internal class Program
{
public static async Task<int> Main(string[] args)
{
var host = Host.CreateDefaultBuilder()
.ConfigureHostConfiguration(b => b.AddJsonFile("appsettings.json"))
.ConfigureServices((hbc, services) =>
{
services.AddOptions<RabbitOptions>()
.ValidateDataAnnotations()
.BindConfiguration("app:rabbit");
services.AddTransient<SomeMessageConsumer>();
services.AddMassTransit(bus =>
{
bus.UsingRabbitMq((context, cfg) =>
{
var rabbitOptions = context.GetRequiredService<IOptions<RabbitOptions>>().Value;
cfg.Host(rabbitOptions.Host, rabbitOptions.Port, rabbitOptions.VirtualHost, h =>
{
h.Username(rabbitOptions.Username);
h.Password(rabbitOptions.Password);
});
cfg.ReceiveEndpoint(
"my_queue", e =>
{
e.Consumer<SomeMessageConsumer>(context);
e.DefaultContentType = new ContentType("application/json");
e.UseRawJsonSerializer();
});
cfg.ConfigureEndpoints(context);
});
});
services.AddHostedService<Worker>();
})
.Build();
try
{
await host.RunAsync();
}
catch (OptionsValidationException e)
{
var sb = new StringBuilder();
sb.Append("Bad config for ").Append(e.OptionsType.Name).AppendLine(":");
foreach (var f in e.Failures)
{
var skipFrom = f.IndexOf("with the error: '", StringComparison.InvariantCulture) + 17;
var skipUntil = f.Length - 2;
var msg = f[skipFrom..skipUntil];
sb.Append('\t').AppendLine(msg);
}
Console.Error.WriteLine(sb);
return -1;
}
return 0;
}
}
public class RabbitOptions
{
[Required] public string Host { get; set; } = null!;
[Required, Range(1, int.MaxValue)] public ushort Port { get; set; }
[Required] public string Username { get; set; } = null!;
[Required] public string Password { get; set; } = null!;
[Required] public string VirtualHost { get; set; } = null!;
public Uri Uri(string queue) =>
new UriBuilder
{
Scheme = "rabbitmq",
Port = Port,
Path = "/".Equals(VirtualHost) ? $"/{queue}" : $"/{VirtualHost}/{queue}",
}.Uri;
}
public class Worker(IOptions<RabbitOptions> rabbitOptions, IBus bus, ILogger<Worker> logger)
: BackgroundService
{
private readonly RabbitOptions _rabbitOptions = rabbitOptions.Value;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
var endpoint = await bus.GetSendEndpoint(_rabbitOptions.Uri("my_queue"));
while (!stoppingToken.IsCancellationRequested)
{
await endpoint.Send(new SomeMessage { Value = $"The time is {DateTimeOffset.Now}" }, stoppingToken);
await Task.Delay(1000, stoppingToken);
}
}
catch (Exception e)
{
logger.LogError(e, "Publisher worker failed.");
}
}
}
public class SomeMessage
{
public required string Value { get; set; }
}
public class SomeMessageConsumer(ILogger<SomeMessageConsumer> logger) : IConsumer<SomeMessage>
{
public async Task Consume(ConsumeContext<SomeMessage> context)
{
logger.LogInformation("🐙 Received Text: {Text}", context.Message.Value);
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<EnableConfigurationBindingGenerator>true</EnableConfigurationBindingGenerator>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit.RabbitMQ" Version="8.1.1"/>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0-rc.2.23479.6"/>
<PackageReference Include="Microsoft.Extensions.Options.DataAnnotations" Version="8.0.0-rc.2.23479.6"/>
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment