Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Sample of message dispatcher that routes and records messages (for Lokad.CQRS v2.0)
// this dispatchers routes all incoming messages between 2 queues (commands/events)
// and also records all messages into a tape storage.
// fore registration sample see https://gist.github.com/1035950
public sealed class RoutingDispatcher : ISingleThreadMessageDispatcher
{
readonly IDictionary<string, IQueueWriter> _routes = new Dictionary<string, IQueueWriter>();
readonly QueueWriterRegistry _factories;
readonly string _endpoint;
readonly ITapeWriter _writer;
readonly IEnvelopeStreamer _streamer;
public RoutingDispatcher(string endpoint, QueueWriterRegistry factories, IEnvelopeStreamer streamer, ITa
{
_factories = factories;
_writer = writer;
_streamer = streamer;
_endpoint = endpoint;
}
public void DispatchMessage(ImmutableEnvelope envelope)
{
Record(envelope);
RouteToProcessor(envelope);
}
void Record(ImmutableEnvelope message)
{
var buffer = _streamer.SaveEnvelopeData(message);
_writer.Append("router-log", buffer);
}
public void Init()
{
}
IQueueWriter GetRoute(string name)
{
IQueueWriter value;
if (_routes.TryGetValue(name, out value))
{
return value;
}
IQueueWriterFactory factory;
if (_factories.TryGet(_endpoint, out factory))
{
value = factory.GetWriteQueue(name);
_routes.Add(name, value);
return value;
}
// bingo
var message = string.Format("Failed to load factory for route '{0}:{1}'", _endpoint, name);
throw new InvalidOperationException(message);
}
void RouteToProcessor(ImmutableEnvelope message)
{
if (message.Items.All(i => i.Content is IDomainEvent))
{
GetRoute(IdFor.Events).PutMessage(message);
return;
}
if (message.Items.All(i => i.Content is IDomainCommand))
{
GetRoute(IdFor.Commands).PutMessage(message);
return;
}
throw new InvalidOperationException("Unexpected envelope contents");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment