Skip to content

Instantly share code, notes, and snippets.

@derekgreer
Last active February 4, 2024 10:47
Show Gist options
  • Save derekgreer/5ba7205593ada09141d3bc9e68ad116c to your computer and use it in GitHub Desktop.
Save derekgreer/5ba7205593ada09141d3bc9e68ad116c to your computer and use it in GitHub Desktop.
MassTransit MultiTenant Message Filter Example
using System;
using System.Collections.Generic;
using System.Net.Security;
using System.Security.Authentication;
using System.Threading.Tasks;
using Autofac;
using Autofac.Core;
using GreenPipes;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.PipeConfigurators;
using MassTransit.RabbitMqTransport;
using MyCompany.Common.Messaging.Options;
using MyCompany.MyProject.Consumers;
using Module = Autofac.Module;
namespace MyCompany.MyProject.Initialization.Modules
{
//public static class MultiTenantMessageFilterConfigurationExtensions
//{
// public static void UseMessageFilter(this IConsumePipeConfigurator configurator)
// {
// if (configurator == null)
// throw new ArgumentNullException(nameof(configurator));
// var observer = new MultiTenantMessageFilterConfigurationObserver(configurator);
// }
//}
public class MultiTenantMessageFilter<T> : IFilter<ConsumeContext<T>> where T : class
{
ILifetimeScope _mainScope;
public MultiTenantMessageFilter(ILifetimeScope mainScope)
{
_mainScope = mainScope;
}
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
ILifetimeScope scope = null;
try
{
context.GetOrAddPayload<ILifetimeScope>(() =>
{
scope = _mainScope.BeginLifetimeScope();
var guid = scope.Resolve<Guid>();
return scope;
});
await next.Send(context).ConfigureAwait(false);
}
finally
{
scope?.Dispose();
}
}
public class MultiTenantMessageFilterPipeSpecification<T> : IPipeSpecification<ConsumeContext<T>> where T : class
{
readonly ILifetimeScope _lifetimeScope;
public MultiTenantMessageFilterPipeSpecification(ILifetimeScope lifetimeScope)
{
_lifetimeScope = lifetimeScope;
}
public void Apply(IPipeBuilder<ConsumeContext<T>> builder)
{
var filter = _lifetimeScope.Resolve<MultiTenantMessageFilter<T>>();
builder.AddFilter(filter);
}
public IEnumerable<ValidationResult> Validate()
{
yield break;
}
}
public class MultiTenantMessageFilterConfigurationObserver : ConfigurationObserver, IMessageConfigurationObserver
{
readonly ILifetimeScope _lifetimeScope;
public MultiTenantMessageFilterConfigurationObserver(IConsumePipeConfigurator receiveEndpointConfigurator, ILifetimeScope lifetimeScope)
: base(receiveEndpointConfigurator)
{
_lifetimeScope = lifetimeScope;
Connect(this);
}
public void MessageConfigured<TMessage>(IConsumePipeConfigurator configurator)
where TMessage : class
{
var specification = _lifetimeScope.Resolve<MultiTenantMessageFilterPipeSpecification<TMessage>>();
configurator.AddPipeSpecification(specification);
}
}
public class MessagingModule : Module
{
protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<MultiTenantMessageFilterConfigurationObserver>().AsSelf();
builder.RegisterGeneric(typeof(MultiTenantMessageFilter<>)).As(typeof(MultiTenantMessageFilter<>));
// Use a simple Guid to test that the value set in the filter is the same value injected into the consumer
builder.Register(c => Guid.NewGuid()).AsSelf().InstancePerLifetimeScope();
builder.RegisterGeneric(typeof(MultiTenantMessageFilterPipeSpecification<>)).As(typeof(MultiTenantMessageFilterPipeSpecification<>));
builder.AddMassTransit(configurator =>
{
configurator.AddConsumer<Example1Consumer>();
configurator.AddConsumer<Example2Consumer>();
configurator.AddBus(context =>
{
var options = context.Resolve<MessagingOptions>();
var connection = options.Connection;
var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
// IConsumerPipelineConfigurator isn't registered (at least at this point)
// so it has to be passed as a parameter. Is there a better way?
cfg.ConnectConsumerConfigurationObserver(context.Resolve<MultiTenantMessageFilterConfigurationObserver>(new TypedParameter(typeof(IConsumePipeConfigurator), cfg)));
var host = cfg.Host(connection.Server, connection.Port, connection.VirtualHost,
h =>
{
...
});
cfg.ConfigureConsumer<Example1Consumer>("Example1Queue", host, options, context);
cfg.ConfigureConsumer<Example2Consumer>("Example2Queue", host, options, context);
});
return bus;
}
);
});
}
}
public static class Extensions
{
public static void ConfigureConsumer<TConsumer>(this IRabbitMqBusFactoryConfigurator sbc, string queueName, IRabbitMqHost host, MessagingOptions options,
IComponentContext context) where TConsumer: class, IConsumer
{
sbc.ReceiveEndpoint(host, queueName,
ep =>
{
if(options.RetryCount > 0)
ep.UseRetry(c => c.Interval(options.RetryCount, TimeSpan.FromMinutes(options.RetryIntervalInMinutes)));
ep.ConfigureConsumer<TConsumer>(context);
});
}
}
}
@phatboyg
Copy link

Your observeris already calling Connect(this), so you need to not call this line. You can resolve it from the container, but the constructor will need the dependencies to connect it properly using the IReceiveEndpointConfigurator.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment