Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save mrclayman/2d95d8d6db349a1e76b0b5a694a827cb to your computer and use it in GitHub Desktop.
Save mrclayman/2d95d8d6db349a1e76b0b5a694a827cb to your computer and use it in GitHub Desktop.
MassTransit saga registration using IServiceCollectionBusConfigurator vs. IRabbitMqBusFactoryConfigurator
public static class IRabbitMqBusFactoryConfiguratorExtensions
{
public static void ConfigureSagaReceiveEndpoint<TSagaState, TSagaStateMachine, TRepository>(
this IRabbitMqBusFactoryConfigurator configurator,
IRabbitMqConfiguration configuration,
string endpointConfigurationKey)
where TSagaState : class, SagaStateMachineInstance
where TSagaStateMachine : RabbitMqSagaStateMachine<TSagaState>
where TRepository : ISagaRepository<TSagaState>
{
if (configuration == null)
{
throw new ArgumentNullException(nameof(configuration));
}
if (string.IsNullOrEmpty(endpointConfigurationKey))
{
throw new ArgumentNullException(nameof(endpointConfigurationKey));
}
IRabbitMqReceiveEndpointConfiguration endpointConfiguration =
configuration.GetReceiveEndpointConfiguration(endpointConfigurationKey);
if (string.IsNullOrEmpty(endpointConfiguration.Queue))
{
throw new ConfigurationErrorsException("Queue name must be defined for a saga receive endpoint");
}
configurator.ReceiveEndpoint(endpointConfiguration.Queue, e =>
{
e.PrefetchCount = endpointConfiguration.PrefetchCount;
e.UseMessageRetry(x =>
{
x.Interval(endpointConfiguration.RetryCount,
TimeSpan.FromMilliseconds(endpointConfiguration.RetryWaitInterval));
});
e.StateMachineSaga(
(TSagaStateMachine)Activator.CreateInstance(typeof(TSagaStateMachine), configuration),
Activator.CreateInstance<TRepository>());
});
}
}
public static class IServiceCollectionBusConfiguratorExtensions
{
public static void ConfigureRedisSagaStateMachine<TSagaState, TSagaStateMachine>(
this IServiceCollectionBusConfigurator configurator,
IRabbitMqConfiguration configuration,
string endpointConfigurationKey,
string redisConnectionString)
where TSagaState : class, SagaStateMachineInstance, ISagaVersion
where TSagaStateMachine : RabbitMqSagaStateMachine<TSagaState>
{
IRabbitMqReceiveEndpointConfiguration endpointConfiguration =
GetReceiveEndpointConfiguration(configuration, endpointConfigurationKey);
configurator.AddSagaStateMachine<TSagaStateMachine, TSagaState>(sc =>
{
sc.UseMessageRetry(rc =>
{
rc.Interval(endpointConfiguration.RetryCount,
TimeSpan.FromMilliseconds(endpointConfiguration.RetryWaitInterval));
});
})
.RedisRepository(redisConnectionString)
.Endpoint(c =>
{
c.PrefetchCount = endpointConfiguration.PrefetchCount;
});
}
}
public abstract class RabbitMqSagaStateMachine<TInstance> : MassTransitStateMachine<TInstance>
where TInstance : class, SagaStateMachineInstance
{
/// <summary>
/// Prefix for URI's denoting queue names.
/// </summary>
private const string QueueUriPrefix = "queue:";
/// <summary>
/// Gets the bus configuration object.
/// </summary>
protected IRabbitMqConfiguration BusConfiguration { get; }
/// <summary>
/// Constructor.
/// </summary>
/// <param name="busConfiguration">Bus configuration object.</param>
protected RabbitMqSagaStateMachine(IRabbitMqConfiguration busConfiguration)
{
BusConfiguration = busConfiguration ?? throw new ArgumentNullException(nameof(busConfiguration));
}
/// <summary>
/// Returns an URI object with preconfigured
/// address of the given send endpoint. It is
/// assumed the address in the configuration
/// already contains the queue URI prefix.
/// </summary>
/// <param name="name">Name of the send endpoint
/// in the configuration.</param>
/// <returns>URI object with address from the
/// desired send endpoint configuration block.</returns>
protected Uri GetSendEndpointUriFromConfiguration(string name) =>
new Uri(BusConfiguration.GetSendEndpointConfiguration(name).Address);
/// <summary>
/// Returns an URI object constructed from
/// the given queue name. The queue prefix is
/// automatically prepended to the queue name.
/// </summary>
/// <param name="queueName">Name of the queue.</param>
/// <returns>New URI object based on the given queue name.</returns>
protected Uri GetSendEndpointUriForQueueName(string queueName) =>
new Uri($"{QueueUriPrefix}{queueName}");
}
public class Startup
{
private void InitializeMessageBus(IServiceCollection services)
{
// Create service bus interface managed by the framework
IRabbitMqConfiguration rabbitMqConfig = RabbitMqConfiguration.Create(Configuration);
string redisConnectionString = ConfigurationTools.GetRedisConnectionString(Configuration);
// Add MassTransit to the services and configure
// its consumers
services.AddMassTransit(c =>
{
c.AddConsumers(Assembly.GetAssembly(typeof(Startup)));
c.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(rabbitMqConfig.Host, rabbitMqConfig.VirtualHost, h =>
{
h.Username(rabbitMqConfig.User);
h.Password(rabbitMqConfig.Password);
});
// cfg.ConfigureSagaReceiveEndpoint<
// UploadSagaStateMachineInstance,
// UploadSagaStateMachine,
// InMemorySagaRepository<UploadSagaStateMachineInstance>>(
// rabbitMqConfig, UploadSagaStateMachine.EndpointConfigurationKey);
// cfg.ConfigureSagaReceiveEndpoint<
// UpdateSagaStateMachineInstance,
// UpdateSagaStateMachine,
// InMemorySagaRepository<UpdateSagaStateMachineInstance>>(
// rabbitMqConfig, UpdateSagaStateMachine.EndpointConfigurationKey);
});
c.ConfigureRedisSagaStateMachine<UpdateSagaState, UpdateSagaStateMachine>(
rabbitMqConfig, UpdateSagaStateMachine.EndpointConfigurationKey, redisConnectionString);
c.ConfigureRedisSagaStateMachine<UploadSagaState, UploadSagaStateMachine>(
rabbitMqConfig, UploadSagaStateMachine.EndpointConfigurationKey, redisConnectionString);
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment