Skip to content

Instantly share code, notes, and snippets.

@danbarua
Created September 11, 2013 08:05
Show Gist options
  • Save danbarua/6520603 to your computer and use it in GitHub Desktop.
Save danbarua/6520603 to your computer and use it in GitHub Desktop.
Custom version of EasyNetQ's AutoSubscriber
namespace Foo.Common.Messaging.EasyNetQ
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Security.Cryptography;
using System.Text;
using System.Threading.Tasks;
using Castle.Windsor;
using global::EasyNetQ;
using global::EasyNetQ.Topology;
using Foo.Common.Interfaces;
using ServiceStack.Logging;
/// <summary>Hooks up Domain Message Handlers in a <see cref="IWindsorContainer"/> with EasyNetQ subscriptions on a <see cref="IBus"/>.</summary>
public class EasyNetQAdvancedAutoSubscriber
{
/// <summary>The log.</summary>
private static readonly ILog Log = LogManager.GetLogger(typeof(EasyNetQAdvancedAutoSubscriber));
/// <summary>The bus.</summary>
private readonly IBus bus;
/// <summary>
/// The container.
/// </summary>
private readonly IWindsorContainer container;
/// <summary>Initialises a new instance of the <see cref="EasyNetQAdvancedAutoSubscriber"/> class.</summary>
/// <param name="bus">The bus.</param>
/// <param name="container">The container.</param>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="bus"/> or <paramref name="container"/> is null.</exception>
public EasyNetQAdvancedAutoSubscriber(IBus bus, IWindsorContainer container)
{
if (bus == null) throw new ArgumentNullException("bus");
if (container == null) throw new ArgumentNullException("container");
this.bus = bus;
this.container = container;
this.GenerateSubscriptionId = this.DefaultSubscriptionIdGenerator;
this.MessageDispatcher = new EasyNetQWindsorMessageDispatcher(container);
}
/// <summary>
/// Gets or sets a function responsible for generating SubscriptionIds, when you use
/// <see cref="IConsume{T}" />, since it does not let you specify
/// specific SubscriptionIds.
/// Message type and SubscriptionId is the key; which if two
/// equal keys exists, you will get round robin consumption of
/// messages.
/// </summary>
public Func<ConsumerInfo, string> GenerateSubscriptionId { protected get; set; }
/// <summary>
/// Gets or sets an <see cref="IMessageDispatcher"/> dispatcher responsible for consuming a message with the relevant message consumer.
/// </summary>
public IMessageDispatcher MessageDispatcher { get; set; }
/// <summary>
/// Scans the <see cref="IWindsorContainer"/> supplied in the constructor to hook up
/// any instances of ICommandHandler and IEventHandler with a subscription on RabbitMQ.
/// </summary>
public void SubscribeMessageHandlers()
{
var advancedSubscribeMethod = this.GetSubscribeMethodOfAdvancedBus();
var candidateMessageHandlerTypes =
this.container.Kernel.GetAssignableHandlers(typeof(object)).Select(x => x.ComponentModel.Implementation);
var subscriptionInfos = this.GetSubscriptionInfos(candidateMessageHandlerTypes);
try
{
foreach (var kv in subscriptionInfos)
{
foreach (var subscriptionInfo in kv.Value)
{
var wrappedMessageParameterType =
typeof(IMessage<>).MakeGenericType(subscriptionInfo.MessageType);
var dispatchMethod =
this.MessageDispatcher.GetType()
.GetMethod("AdvancedDispatch", BindingFlags.Instance | BindingFlags.Public)
.MakeGenericMethod(subscriptionInfo.MessageType, subscriptionInfo.ConcreteType);
var dispatchMethodType = typeof(Func<,,>).MakeGenericType(
wrappedMessageParameterType, typeof(MessageReceivedInfo), typeof(Task));
var dispatchDelegate = Delegate.CreateDelegate(
dispatchMethodType, this.MessageDispatcher, dispatchMethod);
var subscriptionId = this.GenerateSubscriptionId(subscriptionInfo);
Log.DebugFormat(
"Hooking up {0}-{1} with subscriptionID {2}",
subscriptionInfo.ConcreteType,
subscriptionInfo.MessageType,
subscriptionId);
var queue = Queue.DeclareDurable(subscriptionId);
var exchange = Exchange.DeclareTopic(
TypeNameSerializer.Serialize(subscriptionInfo.MessageType), true, false, null);
queue.BindTo(exchange, "#");
var subscribeMethod = advancedSubscribeMethod.MakeGenericMethod(subscriptionInfo.MessageType);
subscribeMethod.Invoke(this.bus.Advanced, new object[] { queue, dispatchDelegate });
}
}
}
catch (Exception ex)
{
Log.Error(ex);
throw;
}
}
/// <summary>The default subscription id generator.</summary>
/// <param name="c">The c.</param>
/// <returns>The <see cref="string"/>.</returns>
protected virtual string DefaultSubscriptionIdGenerator(ConsumerInfo c)
{
var r = new StringBuilder();
var unique = string.Concat(c.ConcreteType.FullName, ":", c.MessageType.FullName);
using (var md5 = MD5.Create())
{
var buff = md5.ComputeHash(Encoding.UTF8.GetBytes(unique));
foreach (var b in buff)
{
r.Append(b.ToString("x2"));
}
}
return string.Concat(c.MessageType.Name, ":", c.ConcreteType.Name, ":", r.ToString());
}
/// <summary>The get subscribe async method of bus.</summary>
/// <returns>The <see cref="MethodInfo"/>.</returns>
protected MethodInfo GetSubscribeAsyncMethodOfBus()
{
var info =
this.bus.GetType()
.GetMethods()
.Where(m => m.Name == "SubscribeAsync")
.Select(m => new { Method = m, Params = m.GetParameters() })
.Single(m => m.Params.Length == 2 && m.Params[0].ParameterType == typeof(string))
.Method;
return info;
}
/// <summary>The get subscribe method of advanced bus.</summary>
/// <returns>The <see cref="MethodInfo"/>.</returns>
protected MethodInfo GetSubscribeMethodOfAdvancedBus()
{
// this is brittle - we're looking for
// public virtual void Subscribe<T>(IQueue queue, Func<IMessage<T>, MessageReceivedInfo, Task>
// not
// public virtual void Subscribe(IQueue queue, Func<Byte[], MessageProperties, MessageReceivedInfo, Task> onMessage)
// it will do for now but may break if new APIs are added to the EasyNetQ Advanced Bus Api (IAdvancedBus)
var info =
this.bus.Advanced.GetType()
.GetMethods()
.Where(x => x.Name == "Subscribe")
.Select(x => new { Method = x, Params = x.GetParameters() })
.Single(
x =>
x.Params.Length == 2 && x.Params[0].ParameterType == typeof(IQueue)
&& x.Params[1].ParameterType
!= typeof(Func<byte[], MessageProperties, MessageReceivedInfo, Task>))
.Method;
return info;
}
/// <summary>Gets Subscriber Information.</summary>
/// <param name="types">The types.</param>
/// <returns>The a collection of KeyValuePairs mapping types to ConsumerInfo array.</returns>
protected virtual IEnumerable<KeyValuePair<Type, ConsumerInfo[]>> GetSubscriptionInfos(IEnumerable<Type> types)
{
return from concreteType in types.Where(t => t.IsClass)
let subscriptionInfos =
concreteType.GetInterfaces()
.Where(
i =>
i.IsGenericType
&& (i.GetGenericTypeDefinition() == typeof(IEventHandler<>)
|| i.GetGenericTypeDefinition() == typeof(ICommandHandler<>)))
.Select(i => new ConsumerInfo(concreteType, i, i.GetGenericArguments()[0]))
.ToArray()
where subscriptionInfos.Any()
select new KeyValuePair<Type, ConsumerInfo[]>(concreteType, subscriptionInfos);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment