Created
July 12, 2012 20:41
-
-
Save jyoung/3100830 to your computer and use it in GitHub Desktop.
Hooking up EasyNetQ actions with StructureMap
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EasyNetQServiceBus : IServiceBus | |
{ | |
private readonly IBus bus; | |
public EasyNetQServiceBus(IBus bus) | |
{ | |
this.bus = bus; | |
} | |
#region Implementation of IServiceBus | |
public void Subscribe<TMessage>(string subscriberId, Action<TMessage> onMessage) | |
{ | |
bus.SubscribeAsync<TMessage>(subscriberId, (m) => Task.Factory.StartNew(() => | |
{ | |
try | |
{ | |
// Set NH Current Session Context | |
Console.Out.WriteLine("Before Message: " + Thread.CurrentThread.ManagedThreadId); | |
onMessage(m); | |
} | |
finally | |
{ | |
// Dispose NH Current Session Context | |
Console.Out.WriteLine("After Message: " + +Thread.CurrentThread.ManagedThreadId); | |
} | |
})); | |
} | |
public void Publish<TMessage>(TMessage message) | |
{ | |
using (var channel = bus.OpenPublishChannel()) | |
{ | |
channel.Publish(message); | |
} | |
} | |
#endregion | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
internal class Program | |
{ | |
private static void Main(string[] args) | |
{ | |
ObjectFactory.Configure(cfg => | |
{ | |
cfg.For<IServiceBus>().Singleton().Use(() => new EasyNetQServiceBus(RabbitHutch.CreateBus("host=localhost"))); | |
cfg.Scan(scan => | |
{ | |
scan.TheCallingAssembly(); | |
scan.AddAllTypesOf(typeof(ISubscribeTo<>)); | |
}); | |
}); | |
var subscriberInfos = GetAllSubscribers(); | |
var bus = ObjectFactory.GetInstance<IServiceBus>(); | |
var subscriptionMethod = bus.GetType().GetMethod("Subscribe"); | |
foreach (var subscriberInfo in subscriberInfos) | |
{ | |
var subscribeMethod = subscriptionMethod.MakeGenericMethod(subscriberInfo.MessageType); | |
SubscriberInfo info = subscriberInfo; | |
Action<object> invoker = o => | |
{ | |
using (var container = ObjectFactory.Container.GetNestedContainer()) | |
{ | |
var subscriberType = info.SubscriberType; | |
var instance = container.GetInstance(subscriberType); | |
var method = subscriberType.GetMethod("Handle", new[] { info.MessageType }); | |
method.Invoke(instance, new[] {o}); | |
} | |
}; | |
subscribeMethod.Invoke(bus, new object[] { info.SubscriberType.Name, invoker }); | |
} | |
while (true) | |
{ | |
Console.WriteLine("Type a message and press Enter to send a message, or type X and enter to close"); | |
var line = Console.ReadLine(); | |
if (line != null && line.ToUpper() == "X") | |
{ | |
break; | |
} | |
Console.Out.WriteLine("Publishing Message"); | |
bus.Publish(new MyMessage(){ Message = line}); | |
} | |
} | |
private static IEnumerable<SubscriberInfo> GetAllSubscribers() | |
{ | |
var subscribers = ObjectFactory.Model | |
.AllInstances | |
.Where(x => x.PluginType.IsGenericType) | |
.Where(x => x.PluginType.GetGenericTypeDefinition() == typeof(ISubscribeTo<>)) | |
.ToArray(); | |
foreach(var subscriber in subscribers) | |
{ | |
var concreteType = subscriber.ConcreteType; | |
var subscriberOfInterface = concreteType | |
.GetInterfaces() | |
.SingleOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeTo<>)); | |
if (subscriberOfInterface == null) continue; | |
var messageType = subscriberOfInterface.GetGenericArguments()[0]; | |
yield return new SubscriberInfo(concreteType, messageType); | |
} | |
} | |
} | |
public class SubscriberInfo | |
{ | |
public Type SubscriberType { get; private set; } | |
public Type MessageType { get; private set; } | |
public SubscriberInfo(Type subscriberType, Type messageType) | |
{ | |
SubscriberType = subscriberType; | |
MessageType = messageType; | |
} | |
} | |
public interface ISubscribeTo<in TMessage> where TMessage : class | |
{ | |
void Handle(TMessage message); | |
} | |
public class MyMessageSubscriberOne : ISubscribeTo<MyMessage> | |
{ | |
#region Overrides of SubscribeTo<MyMessage> | |
public void Handle(MyMessage message) | |
{ | |
Console.Out.WriteLine("Subscriber 1 Consuming MyMessage from Thread: " + Thread.CurrentThread.ManagedThreadId); | |
} | |
#endregion | |
} | |
public class MyMessageSubscriberTwo : ISubscribeTo<MyMessage> | |
{ | |
#region Overrides of SubscribeTo<MyMessage> | |
public void Handle(MyMessage message) | |
{ | |
Console.Out.WriteLine("Subscriber 2 Consuming MyMessage from Thread: " + Thread.CurrentThread.ManagedThreadId); | |
} | |
#endregion | |
} | |
public class MyMessage | |
{ | |
public string Message { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment