Skip to content

Instantly share code, notes, and snippets.

@AlexeyRaga
Created February 24, 2013 02:30
Show Gist options
  • Save AlexeyRaga/5022309 to your computer and use it in GitHub Desktop.
Save AlexeyRaga/5022309 to your computer and use it in GitHub Desktop.
An example of ConventionalSubscriptionBuilder for EasyNetQ
public sealed class ConventionalSubscriptionBuilder
{
private readonly IBus _bus;
private readonly string _handlerMethodName;
//by default handling method is name 'When'
public ConventionalSubscriptionBuilder(IBus bus, string handlerMethodName = "When")
{
if (bus == null) throw new ArgumentNullException("bus");
if (String.IsNullOrWhiteSpace(handlerMethodName)) throw new ArgumentNullException("handlerMethodName");
_bus = bus;
_handlerMethodName = handlerMethodName;
}
public void Subscribe(object subscriber)
{
if (subscriber == null) throw new ArgumentNullException("subscriber");
var subscriberType = subscriber.GetType();
//het all the message types the subscriber wants to subscribe to
var subjectSubscribesTo = GetSubscribedTypes(subscriberType, _handlerMethodName);
foreach (var messageType in subjectSubscribesTo)
{
//build a subscription Id.
var subscriptionId = CreateSubscriptionId(subscriberType, messageType);
Subscribe(_bus, subscriptionId, subscriber, messageType);
}
}
private string CreateSubscriptionId(Type subscriberType, Type messageType)
{
//I simply return an application service name because it is unlikely *for me* that
//two different application services will be having the same name.
//Naming is important! :)
return subscriberType.Name;
}
private void Subscribe(IBus bus, string subscriptionId, object subscriber, Type messageType)
{
//build a "subsribe" delegate.
//can be a place for optimization *if needed*: simply cache an action per message type
var subscribeAction = GetSubscribeAction(bus, messageType);
//a handler that calls subscriber's handling method for the specified message type
var handler = CreateGenericHandler(subscriber, _handlerMethodName, messageType);
//make a subscription
subscribeAction(subscriptionId, handler);
}
private static Action<string, Delegate> GetSubscribeAction(IBus bus, Type messageType)
{
var busInstance = Expression.Constant(bus);
var subscriptionIdParameter = Expression.Parameter(typeof(string), "subscriptionId");
var handlerType = typeof(Action<>).MakeGenericType(messageType);
var handlerParameter = Expression.Parameter(typeof(Delegate), "handler");
var callExpression = Expression.Call(busInstance,
"Subscribe",
new[] { messageType },
subscriptionIdParameter,
Expression.Convert(handlerParameter, handlerType));
return Expression.Lambda<Action<string, Delegate>>(callExpression, subscriptionIdParameter, handlerParameter).Compile();
}
public static Delegate CreateGenericHandler(object instance, string methodName, Type parameterType)
{
var instanceExpression = Expression.Constant(instance);
var parameter = Expression.Parameter(parameterType, "message");
MethodCallExpression callExpression;
try
{
callExpression = Expression.Call(instanceExpression, methodName, null, parameter);
}
catch (InvalidOperationException ex)
{
var signature = String.Format("Method {0}.{1}({2} message) not found.", instance.GetType().FullName,
methodName, parameterType.FullName);
throw new InvalidOperationException(signature, ex);
}
var handlerType = typeof(Action<>).MakeGenericType(parameterType);
return Expression.Lambda(handlerType, callExpression, parameter).Compile();
}
private static IEnumerable<Type> GetSubscribedTypes(Type subscriberType, string handlerMethodName)
{
var subscribedTypes = subscriberType
.GetMethods(BindingFlags.Public | BindingFlags.Instance)
.Where(x => x.Name == handlerMethodName)
.Select(x => x.GetParameters())
.Where(x => x.Length == 1)
.Select(x => x.First())
.Where(x => !x.IsOut)
.Select(x => x.ParameterType)
.Distinct();
return subscribedTypes.ToList();
}
}
//defines the domain context
public static class SaaSBoundedContext
{
//builds and returns all the application services for the domain
public static IEnumerable<object> BuildApplicationServices()
{
yield return new UserProfileApplicationService()
}
//builds and returns all the event ports (see Ports & Adapters) for the domain
public static IEnumerable<object> BuildEventPorts() {
}
}
//An application service, typically for handling Commands.
//By convention commands are delivered to When(CommandType command) methods
public sealed class UserProfileApplicationService
{
public void When(AttachPhotoToUserProfile message)
{
//Load an aggregate, call its behavior, save the aggregate
}
public void When(PromoteUserProfile message)
{
//logic to promote a profile
}
}
//Typically configures a worker.
//Several bounded contexts can be involved if needed
public sealed class WireUp()
{
public void Configure(IBus bus) {
var bus = //build up a bus
var subscriptionBuilder = new ConventionalSubscriptionBuilder(bus);
var appServices = SaaSBoundedContext.BuildApplicationServices();
foreach(var appService in appServices) {
subscriptionBuilder.Subscribe(appService);
}
//subscribe ports (event processors) as well,
//do whatever needed to have the worker up and running
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment