Skip to content

Instantly share code, notes, and snippets.

@yevhen
Created March 19, 2013 20:07
Show Gist options
  • Save yevhen/5199613 to your computer and use it in GitHub Desktop.
Save yevhen/5199613 to your computer and use it in GitHub Desktop.
The concept of message handling Component and the example of message handler chaining via functional composition
/* somewhere in your Core.CQRS */
// Base class for all ES-based aggregate command handling components;
//
// NOTE: "Component" is a logical grouping of message handlers by function
// They provide good place to encapsulate chaining of cross-cutting concerns
// into a pipeline, providing simplified helper methods for registration of message handlers
//
// Components are similar to Services, thus they only contain handlers of single type (ie Command Handlers only)
// Components operate on envelope (infrastructure) level
//
// While Message Handlers are open for dependency injection, which makes them extremely easy to unit test,
// Components, on the opposite, are self-contained and depend only on external configuration,
// In this respect they act as Composition Roots, effectively shielding consumers
// from intricacies of Message Handlers configuration and composition.
//
// Components implement cross-cutting/infrastructure functionality
// Individual handlers could be easily unit tested as they are nothing more than
// just a static functions, having dependencies only on input parameters,
//
// I know I'm guilty for using inheritance but I don't care :)
// it works perfectly here and concept as the whole proven itself in real-world
// putting functions below in their own classes doesn't provide much benefits except purity
// they are anyway make sense only in the context of a particular component
public abstract class AggregateCommandHandlingComponentBase : MessageHandlingComponentBase
{
ICommandHandlerRegistry registry;
Action<EventEnvelope> publisher;
EventStoreConnection store;
public AggregateCommandHandlingComponentBase Configure(Action<EventEnvelope> publisher)
{
this.publisher = publisher;
// you can pass this externally if you wish
var address = ConfigurationManager.AppSettings["EventStore.Uri"];
var port = int.Parse(ConfigurationManager.AppSettings["EventStore.Port"]);
var ipEndPoint = new IPEndPoint(IPAddress.Parse(address), port);
store = new EventStoreConnection(ipEndPoint);
return this;
}
public void Register(ICommandHandlerRegistry commandHandlerRegistry)
{
registry = commandHandlerRegistry;
Bootstrap();
}
protected void RegisterSingleton<TCommand>(Func<AggregateUnitOfWork, Action<TCommand>> handler) where TCommand : Command
{
Register<TCommand>(retryOnConcurrency: true, retryOnDuplicates: true, handler: handler);
}
protected void Register<TCommand>(Func<AggregateUnitOfWork, Action<TCommand>> handler) where TCommand : Command
{
Register<TCommand>(retryOnConcurrency: false, retryOnDuplicates: false, handler: handler);
}
void Register<TCommand>(bool retryOnConcurrency, bool retryOnDuplicates, Func<AggregateUnitOfWork, Action<TCommand>> handler) where TCommand : Command
{
registry.RegisterCommandHandler<TCommand>(x =>
PerformLogging(
CollectStatistics(MessageCategory.Command,
PublishEvents(Log, publisher,
RetryOnConcurrencyConflicts(retryOnConcurrency, Log,
RetryOnDuplicates(retryOnDuplicates, Log,
HandleIdempotency(Log,
WithinUnitOfWork(store, uow =>
envelope => handler(uow)((TCommand)envelope.Command) // unwrap message
)))))))
(x));
}
public static Action<CommandEnvelope> PublishEvents(Logger log, Action<EventEnvelope> publisher, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next)
{
return envelope =>
{
var events = next(envelope);
foreach (var @event in events)
{
publisher.Publish(@event); // delegate which publishes to external Bus
}
};
}
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> RetryOnConcurrencyConflicts(bool retry, Logger log, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next)
{
return envelope =>
{
var retries = 0;
while (true) // probably better solution would be is to implement exponential back-off :)
{
try
{
return next(envelope);
}
catch (AggregateConcurrencyException ex)
{
if (retry)
{
if (retries == MaxRetries)
{
log.Error("Got concurrency conflict on aggregate {0} for command {1} and limit of max retry attempts [{2}] has been reached. Giving up ...", ex.AggregateId, envelope.Id, maxRetries);
throw;
}
retries++;
log.Debug("Got concurrency conflict on aggregate {0} and Retry was designated. Retrying command {1} ... Attempt #{2}", ex.AggregateId, envelope.Id, retries);
}
else
{
log.Debug("Got concurrency conflict on aggregate {0}. Command {1}", ex.AggregateId, envelope.Id);
throw;
}
}
}
};
}
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> RetryOnDuplicates(bool retry, Logger log, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next)
{
return envelope =>
{
try
{
return next(envelope);
}
catch (DuplicateAggregateException ex)
{
if (retry)
{
log.Debug("Aggregate {0} already exists and Retry was designated. Retrying command {1} ...", ex.AggregateId, envelope.Id);
return next(envelope);
}
log.Debug("Aggregate {0} already exists. Command {1}", ex.AggregateId, envelope.Id);
throw;
}
};
}
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> HandleIdempotency(Logger log, Func<CommandEnvelope, IEnumerable<EventEnvelope>> next)
{
return envelope =>
{
try
{
// try consume message, ie try storing message id
// if it's not unique it will blow
return next(envelope);
}
catch (MessageAlreadyConsumedException)
{
log.Warn("Ignored duplicate command {0}", envelope.Id);
return new EventEnvelope[0];
}
};
}
public static Func<CommandEnvelope, IEnumerable<EventEnvelope>> WithinUnitOfWork(EventStoreConnection store, Func<AggregateUnitOfWork, Action<CommandEnvelope>> handler)
{
return envelope =>
{
var session = new EventStoreSession(store, envelope.UserId);
var repository = new AggregateRepository(session);
var uow = new AggregateUnitOfWork(repository);
handler(uow)(envelope);
return uow.Commit().ToArray();
};
}
// this is where subclasses whill register domain specific message handlers
protected abstract void Bootstrap();
}
/* somewhere in your OrderManagement.Service */
public sealed class AggregateCommandHandlingComponent : AggregateCommandHandlingComponentBase
{
protected override void Bootstrap()
{
Register<CreateOrder>(uow => new CreateOrderCommandHandler(uow).Handle);
Register<UpdateOrder>(uow => new UpdateOrderCommandHandler(uow).Handle);
Register<DeleteOrder>(uow => new DeleteOrderCommandHandler(uow).Handle);
Register<ApproveOrder>(uow => new ApproveOrderCommandHandler(uow).Handle);
Register<ShipOrder>(uow => new ShipOrderCommandHandler(uow).Handle);
}
}
/* concrete command handler */
// this could also be collapsed in just a static function
// by injecting UnitOfWork as method parameter
public class CreateOrderCommandHandler : Handles<CreateOrder>
{
readonly AggregateUnitOfWork uow;
public CreateInvoiceCommandHandler(AggregateUnitOfWork uow)
{
this.uow = uow;
}
public void Handle(CreateOrder c)
{
uow.Register(new OrderAggregate(c.OrderId, c.Data));
}
}
/* somewhere in your FrontEnd application */
Bus bus = new Bus(); // in-memory router
new AggregateCommandHandlingComponent()
.Configure(e => mq.Enqueue(e))
.Register(bus);
new ProjectionHandlingComponent()
.Register(bus);
new QueryHandlingComponent()
.Register(bus);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment