Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@SzymonPobiega
Created September 28, 2018 07:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SzymonPobiega/785b40b22b69565118e8cd1819d67abe to your computer and use it in GitHub Desktop.
Save SzymonPobiega/785b40b22b69565118e8cd1819d67abe to your computer and use it in GitHub Desktop.
using System;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using NServiceBus;
using NServiceBus.Features;
using NServiceBus.Pipeline;
using NServiceBus.Sagas;
class Program
{
static void Main(string[] args)
{
Start().GetAwaiter().GetResult();
}
static async Task Start()
{
var config = new EndpointConfiguration("MultiTenantSagas");
config.UsePersistence<LearningPersistence>();
config.UseTransport<LearningTransport>();
config.SendFailedMessagesTo("error");
var endpoint = await Endpoint.Start(config);
Console.WriteLine("Type '<correlation> <tenant>' and hit enter to send a message.");
while (true)
{
var line = Console.ReadLine();
if (line == null)
{
continue;
}
var parts = line.Split(' ');
if (parts.Length != 2)
{
continue;
}
var correlation = parts[0].Trim();
var tenant = parts[1].Trim();
var message = new MyMessage
{
Correlation = correlation,
};
var options = new SendOptions();
options.SetHeader("Tenant", tenant);
options.RouteToThisEndpoint();
await endpoint.Send(message, options);
}
}
}
class TenantIdReaderBehavior : Behavior<IIncomingLogicalMessageContext>
{
ConditionalWeakTable<object, string> tenantIdMap;
public TenantIdReaderBehavior(ConditionalWeakTable<object, string> tenantIdMap)
{
this.tenantIdMap = tenantIdMap;
}
public override Task Invoke(IIncomingLogicalMessageContext context, Func<Task> next)
{
var tenantId = context.Headers["Tenant"];
tenantIdMap.Add(context.Message.Instance, tenantId);
return next();
}
}
class MultiTenantSagasFeature : Feature
{
public MultiTenantSagasFeature()
{
EnableByDefault();
DependsOn<Sagas>();
}
protected override void Setup(FeatureConfigurationContext context)
{
//ConditionalWeakTable creates weak associations between objects that don't prevent these objects from being garbage collected.
var tenantIdMap = new ConditionalWeakTable<object, string>();
var sagaMetadata = context.Settings.Get<SagaMetadataCollection>();
//We hand-pick MySaga. In real life we would probably iterate over all sagas and use some convention to select the multi-tenant ones
var mySagaMetadata = sagaMetadata.Find(typeof(MySaga));
var correlationPropertyAccessor = (Func<object, object>) mySagaMetadata.Finders.First().Properties["property-accessor"];
object tenantAwarePropertyAccessor(object msg)
{
var propValue = correlationPropertyAccessor(msg);
if (!tenantIdMap.TryGetValue(msg, out var tenantId))
{
throw new Exception("Missing tenant ID");
}
return $"{tenantId}_{propValue}";
}
//We *assume* there are no other handlers for simplicity. In real world we would check and only overwrite this for property finders
mySagaMetadata.Finders.First().Properties["property-accessor"] = (Func<object, object>) tenantAwarePropertyAccessor;
context.Pipeline.Register(new TenantIdReaderBehavior(tenantIdMap), "Reads tenant ID from the headers and associates it with the message object.");
}
}
class MySaga : Saga<MySagaData>, IAmStartedByMessages<MyMessage>
{
protected override void ConfigureHowToFindSaga(SagaPropertyMapper<MySagaData> mapper)
{
mapper.ConfigureMapping<MyMessage>(m => m.Correlation).ToSaga(s => s.Correlation);
}
public Task Handle(MyMessage message, IMessageHandlerContext context)
{
Data.Counter++;
Console.WriteLine($"Handing message {Data.Counter} in saga for {Data.Correlation}. Saga id: {Data.Id}.");
return Task.CompletedTask;
}
}
class MySagaData : ContainSagaData
{
public string Correlation { get; set; }
public int Counter { get; set; }
}
class MyMessage : IMessage
{
public string Correlation { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment