Skip to content

Instantly share code, notes, and snippets.

@jyoung
Created July 12, 2012 20:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jyoung/3100830 to your computer and use it in GitHub Desktop.
Save jyoung/3100830 to your computer and use it in GitHub Desktop.
Hooking up EasyNetQ actions with StructureMap
public class EasyNetQServiceBus : IServiceBus
{
private readonly IBus bus;
public EasyNetQServiceBus(IBus bus)
{
this.bus = bus;
}
#region Implementation of IServiceBus
public void Subscribe<TMessage>(string subscriberId, Action<TMessage> onMessage)
{
bus.SubscribeAsync<TMessage>(subscriberId, (m) => Task.Factory.StartNew(() =>
{
try
{
// Set NH Current Session Context
Console.Out.WriteLine("Before Message: " + Thread.CurrentThread.ManagedThreadId);
onMessage(m);
}
finally
{
// Dispose NH Current Session Context
Console.Out.WriteLine("After Message: " + +Thread.CurrentThread.ManagedThreadId);
}
}));
}
public void Publish<TMessage>(TMessage message)
{
using (var channel = bus.OpenPublishChannel())
{
channel.Publish(message);
}
}
#endregion
}
internal class Program
{
private static void Main(string[] args)
{
ObjectFactory.Configure(cfg =>
{
cfg.For<IServiceBus>().Singleton().Use(() => new EasyNetQServiceBus(RabbitHutch.CreateBus("host=localhost")));
cfg.Scan(scan =>
{
scan.TheCallingAssembly();
scan.AddAllTypesOf(typeof(ISubscribeTo<>));
});
});
var subscriberInfos = GetAllSubscribers();
var bus = ObjectFactory.GetInstance<IServiceBus>();
var subscriptionMethod = bus.GetType().GetMethod("Subscribe");
foreach (var subscriberInfo in subscriberInfos)
{
var subscribeMethod = subscriptionMethod.MakeGenericMethod(subscriberInfo.MessageType);
SubscriberInfo info = subscriberInfo;
Action<object> invoker = o =>
{
using (var container = ObjectFactory.Container.GetNestedContainer())
{
var subscriberType = info.SubscriberType;
var instance = container.GetInstance(subscriberType);
var method = subscriberType.GetMethod("Handle", new[] { info.MessageType });
method.Invoke(instance, new[] {o});
}
};
subscribeMethod.Invoke(bus, new object[] { info.SubscriberType.Name, invoker });
}
while (true)
{
Console.WriteLine("Type a message and press Enter to send a message, or type X and enter to close");
var line = Console.ReadLine();
if (line != null && line.ToUpper() == "X")
{
break;
}
Console.Out.WriteLine("Publishing Message");
bus.Publish(new MyMessage(){ Message = line});
}
}
private static IEnumerable<SubscriberInfo> GetAllSubscribers()
{
var subscribers = ObjectFactory.Model
.AllInstances
.Where(x => x.PluginType.IsGenericType)
.Where(x => x.PluginType.GetGenericTypeDefinition() == typeof(ISubscribeTo<>))
.ToArray();
foreach(var subscriber in subscribers)
{
var concreteType = subscriber.ConcreteType;
var subscriberOfInterface = concreteType
.GetInterfaces()
.SingleOrDefault(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(ISubscribeTo<>));
if (subscriberOfInterface == null) continue;
var messageType = subscriberOfInterface.GetGenericArguments()[0];
yield return new SubscriberInfo(concreteType, messageType);
}
}
}
public class SubscriberInfo
{
public Type SubscriberType { get; private set; }
public Type MessageType { get; private set; }
public SubscriberInfo(Type subscriberType, Type messageType)
{
SubscriberType = subscriberType;
MessageType = messageType;
}
}
public interface ISubscribeTo<in TMessage> where TMessage : class
{
void Handle(TMessage message);
}
public class MyMessageSubscriberOne : ISubscribeTo<MyMessage>
{
#region Overrides of SubscribeTo<MyMessage>
public void Handle(MyMessage message)
{
Console.Out.WriteLine("Subscriber 1 Consuming MyMessage from Thread: " + Thread.CurrentThread.ManagedThreadId);
}
#endregion
}
public class MyMessageSubscriberTwo : ISubscribeTo<MyMessage>
{
#region Overrides of SubscribeTo<MyMessage>
public void Handle(MyMessage message)
{
Console.Out.WriteLine("Subscriber 2 Consuming MyMessage from Thread: " + Thread.CurrentThread.ManagedThreadId);
}
#endregion
}
public class MyMessage
{
public string Message { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment