Skip to content

Instantly share code, notes, and snippets.

@joliver
Created October 25, 2011 03:16
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save joliver/1311195 to your computer and use it in GitHub Desktop.
Save joliver/1311195 to your computer and use it in GitHub Desktop.
NServiceBusCommitDispatcher
public sealed class NServiceBusCommitDispatcher : IPublishMessages
{
private const string AggregateIdKey = "AggregateId";
private const string CommitVersionKey = "CommitVersion";
private const string EventVersionKey = "EventVersion";
private const string BusPrefixKey = "Bus.";
private readonly IBus bus;
public NServiceBusCommitDispatcher(IBus bus)
{
this.bus = bus;
}
public void Dispose()
{
GC.SuppressFinalize(this);
}
public void Publish(Commit commit)
{
for (var i = 0; i < commit.Events.Count; i++)
{
var eventMessage = commit.Events[i];
var busMessage = eventMessage.Body as IMessage;
AppendHeaders(busMessage, commit.Headers);
AppendHeaders(busMessage, eventMessage.Headers);
AppendVersion(commit, i);
this.bus.Publish(busMessage);
}
}
private static void AppendHeaders(IMessage message, IEnumerable<KeyValuePair<string, object>> headers)
{
headers = headers.Where(x => x.Key.StartsWith(BusPrefixKey));
foreach (var header in headers)
{
var key = header.Key.Substring(BusPrefixKey.Length);
var value = (header.Value ?? string.Empty).ToString();
message.SetHeader(key, value);
}
}
private static void AppendVersion(Commit commit, int index)
{
var busMessage = commit.Events[index].Body as IMessage;
busMessage.SetHeader(AggregateIdKey, commit.StreamId.ToString());
busMessage.SetHeader(CommitVersionKey, commit.StreamRevision.ToString());
busMessage.SetHeader(EventVersionKey, GetSpecificEventVersion(commit, index).ToString());
}
private static int GetSpecificEventVersion(Commit commit, int index)
{
// e.g. (StreamRevision: 120) - (5 events) + 1 + (index @ 4: the last index) = event version: 120
return commit.StreamRevision - commit.Events.Count + 1 + index;
}
}
@joliver
Copy link
Author

joliver commented Nov 17, 2011

In v3.0, the name is "IDispatchCommits"

@smhinsey
Copy link

Do you happen to know of an equivalent example for MassTransit?

@joliver
Copy link
Author

joliver commented Mar 19, 2012

Sorry, I haven't used MT in quite some time, but I imagine it would be relatively close to this.

@smhinsey
Copy link

smhinsey commented Mar 19, 2012 via email

@padenj
Copy link

padenj commented Jul 18, 2012

There is a great example using MassTransit in the Documently sample application found here: https://github.com/haf/Documently

@worldspawn
Copy link

Thanks padenj

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment