Skip to content

Instantly share code, notes, and snippets.

@djmnz
Created February 18, 2016 04:09
Show Gist options
  • Save djmnz/acc2f484187d59d81d97 to your computer and use it in GitHub Desktop.
Save djmnz/acc2f484187d59d81d97 to your computer and use it in GitHub Desktop.
MassTransit + AzureServiceBus + AutoFac
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
using MassTransit;
using MassTransit.AutofacIntegration;
using MassTransit.AzureServiceBusTransport;
using Microsoft.ServiceBus;
using StarNow.Events.Core.Configuration;
namespace StarNow.Core.Tests.Events
{
public class ServiceBusIntegrationTest
{
public static void Main()
{
var tasks = new List<Task>();
//tasks.Add(Publisher(CreateServiceBusReceiver(false)));
tasks.Add(Task.Factory.StartNew(() => Receiver(CreateAutoFacReceiver())));
tasks.Add(Task.Factory.StartNew(() => Publisher(CreateAutoFacPublisher())));
//tasks.Add(Receiver(CreateServiceBusReceiver(true)));
//tasks.Add(Receiver(CreateAutoFacReceiverManual()));
Task.WaitAll(tasks.ToArray());
}
public async static Task Publisher(IBusControl bus)
{
using (var handle = bus.Start())
{
int i = 0;
while (i < 10)
{
i++;
if (i%3 == 0)
{
await bus.Publish<SomethingElseHappened>(new SomethingElseHappenedEvent()
{
Claro = new Built() { Message = "hello" },
Blabla = "something else app" + i,
When = DateTime.UtcNow
});
Debug.WriteLine("Published Else " + i);
}
else
{
await bus.Publish<SomethingHappened>(new SomethingHappenedEvent()
{
Test = "console app" + i,
When = DateTime.UtcNow
});
Debug.WriteLine("Published Normal " + i);
}
Thread.Sleep(1000);
}
handle.Stop();
}
}
public async static Task Receiver(IBusControl bus)
{
using (var handle = bus.Start())
{
int i = 0;
while (i < 10)
{
i++;
Debug.WriteLine("OK" + i);
Thread.Sleep(1000);
}
handle.Stop();
}
}
private static IBusControl CreateAutoFacPublisher()
{
var builder = new ContainerBuilder();
builder.RegisterModule(new BusPublisherModule());
var container = builder.Build();
return container.Resolve<IBusControl>();
}
private static IBusControl CreateAutoFacReceiver()
{
var builder = new ContainerBuilder();
builder.RegisterType<SomeDependency>()
.InstancePerLifetimeScope()
.AsSelf();
//builder.RegisterModule(new BusConsumerModule(new IBusConsumerRegistration[]
//{
// new BusConsumerRegistration<SomethingHappened,TestMessageConsumer>("something_happened"),
// new BusConsumerRegistration<SomethingElseHappened,ElseMessageConsumer>("something_else_happened"),
// new BusConsumerRegistration<SomethingElseHappened,SpecialElseMessageConsumer>("something_else_happened_otherqueue")
//}).
builder.RegisterModule(
new BusConsumerModule()
.AddConsumer<SomethingHappened, TestMessageConsumer>("something_happened")
.AddConsumer<SomethingElseHappened, ElseMessageConsumer>("something_else_happened")
.AddConsumer<SomethingElseHappened, SpecialElseMessageConsumer>("something_else_happened_otherqueue")
);
var container = builder.Build();
return container.Resolve<IBusControl>();
}
private static IBusControl CreateAutoFacReceiverManual()
{
var builder = new ContainerBuilder();
builder.RegisterType<TestMessageConsumer>();
builder.RegisterGeneric(typeof(AutofacConsumerFactory<>))
.WithParameter(new NamedParameter("name", "message"))
.As(typeof(IConsumerFactory<>));
builder.Register(context =>
{
var busControl = Bus.Factory.CreateUsingAzureServiceBus(sbc =>
{
var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb",
"SOMETHING-ns",
"WebEvents");
var host = sbc.Host(serviceUri, h =>
{
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(
"RootManageSharedAccessKey",
"SOMETHING");
});
sbc.UseJsonSerializer();
sbc.ReceiveEndpoint(host, "something_happened", ep =>
{
ep.Consumer<TestMessageConsumer>(context.Resolve<IConsumerFactory<TestMessageConsumer>>());
});
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
var container = builder.Build();
return container.Resolve<IBusControl>();
}
}
public interface Build
{
string Message { get; set; }
}
public class Built : Build
{
public string Message { get; set; }
}
public class Logge1r
{
}
public class TestMessageConsumer : IConsumer<SomethingHappened>
{
private readonly SomeDependency _dependency;
public TestMessageConsumer(SomeDependency dependency)
{
_dependency = dependency;
}
public async Task Consume(ConsumeContext<SomethingHappened> context)
{
Debug.WriteLine("SHIT MSG = " + context.Message.Test);
}
}
public class ElseMessageConsumer : IConsumer<SomethingElseHappened>
{
public async Task Consume(ConsumeContext<SomethingElseHappened> context)
{
Debug.WriteLine("ELSE MSG CLARO = " + context.Message.Claro.Message);
Debug.WriteLine("ELSE MSG = " + context.Message.Blabla);
}
}
public class SpecialElseMessageConsumer : IConsumer<SomethingElseHappened>
{
public async Task Consume(ConsumeContext<SomethingElseHappened> context)
{
Debug.WriteLine("SPECIAL ELSE MSG CLARO = " + context.Message.Claro.Message);
Debug.WriteLine("SPECIAL ELSE MSG = " + context.Message.Blabla);
}
}
public interface SomethingElseHappened
{
DateTime When { get; set; }
string Blabla { get; set; }
Build Claro { get; set; }
}
public interface SomethingHappened
{
DateTime When { get; set; }
string Test { get; set; }
}
public class SomethingHappenedEvent : SomethingHappened
{
public SomethingHappenedEvent()
{
}
public DateTime When { get; set; }
public string Test { get; set; }
}
public class SomethingElseHappenedEvent : SomethingElseHappened
{
public DateTime When { get; set; }
public string Blabla { get; set; }
public Build Claro { get; set; }
}
public class SomeDependency : IDisposable
{
private readonly int _instance = Total++;
private static int Total = 0;
public SomeDependency()
{
Debug.WriteLine($"Created {_instance}");
}
public void Dispose()
{
Debug.WriteLine($"DISPOSED {_instance}");
}
}
}
using System;
using System.Collections.Generic;
using Autofac;
using MassTransit;
using MassTransit.AutofacIntegration;
using MassTransit.AzureServiceBusTransport;
using Microsoft.ServiceBus;
namespace StarNow.Events.Core.Configuration
{
public class BusConsumerModule : BusModule
{
private readonly List<IBusConsumerRegistration> _consumers;
public BusConsumerModule()
{
_consumers = new List<IBusConsumerRegistration>();
}
public BusConsumerModule(IEnumerable<IBusConsumerRegistration> consumers)
{
this._consumers = new List<IBusConsumerRegistration>(consumers);
}
protected override void Load(ContainerBuilder builder)
{
foreach (var consumer in _consumers)
{
consumer.RegisterService(builder);
}
builder.RegisterGeneric(typeof(AutofacConsumerFactory<>))
.WithParameter(new NamedParameter("name", "message"))
.As(typeof(IConsumerFactory<>));
builder.Register(context =>
{
var busControl = Bus.Factory.CreateUsingAzureServiceBus(sbc =>
{
var serviceUri = ServiceBusEnvironment.CreateServiceUri("sb",
"SOMETHING-ns",
"WebEvents");
var host = sbc.Host(serviceUri, h =>
{
h.OperationTimeout = TimeSpan.FromSeconds(5);
h.TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(
"RootManageSharedAccessKey",
"SOMETHING");
});
sbc.UseJsonSerializer();
foreach (var register in _consumers)
{
sbc.ReceiveEndpoint(host, register.QueueName, ep =>
{
register.SetupEndpoint(ep, context);
});
}
});
return busControl;
})
.SingleInstance()
.As<IBusControl>()
.As<IBus>();
base.Load(builder);
}
public BusConsumerModule AddConsumer<TMessage, TConsumer>(string queueName, Action<BusConsumerRegistration<TMessage, TConsumer>> registrationConfiguration = null)
where TMessage : class
where TConsumer : class, IConsumer<TMessage>
{
var busConsumerRegistration = new BusConsumerRegistration<TMessage, TConsumer>(queueName);
if (registrationConfiguration != null)
{
registrationConfiguration(busConsumerRegistration);
}
_consumers.Add(busConsumerRegistration);
return this;
}
}
}
using System;
using Autofac;
using Autofac.Builder;
using MassTransit;
using MassTransit.AzureServiceBusTransport;
namespace StarNow.Events.Core.Configuration
{
public interface IBusConsumerRegistration
{
string QueueName { get; set; }
void RegisterService(ContainerBuilder builder);
void SetupEndpoint(IServiceBusReceiveEndpointConfigurator ep, IComponentContext context);
}
public class BusConsumerRegistration<TMessage, TConsumer> : IBusConsumerRegistration
where TMessage : class
where TConsumer : class, IConsumer<TMessage>
{
public BusConsumerRegistration(string queueName)
{
QueueName = queueName;
}
public string QueueName { get; set; }
public Action<IRegistrationBuilder<TConsumer, ConcreteReflectionActivatorData, SingleRegistrationStyle>> ContainerExtensions
{ get; set; }
public void RegisterService(ContainerBuilder builder)
{
var registrationBuilder = builder.RegisterType<TConsumer>().AsImplementedInterfaces().AsSelf();
if (ContainerExtensions != null) ContainerExtensions(registrationBuilder);
}
public void SetupEndpoint(IServiceBusReceiveEndpointConfigurator ep, IComponentContext context)
{
ep.Consumer<TConsumer>(context.Resolve<IConsumerFactory<TConsumer>>());
}
public BusConsumerRegistration<TMessage, TConsumer> ConfigureContainer(Action<IRegistrationBuilder<TConsumer, ConcreteReflectionActivatorData, SingleRegistrationStyle>> configure)
{
ContainerExtensions = configure;
return this;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment