Skip to content

Instantly share code, notes, and snippets.

@cbcwebdev
Created April 17, 2012 23:28
Show Gist options
  • Save cbcwebdev/2409876 to your computer and use it in GitHub Desktop.
Save cbcwebdev/2409876 to your computer and use it in GitHub Desktop.
public interface IReplayCommits
{
void Replay(DateTime start);
void Replay(Guid streamId, int minRevision, int maxRevision);
}
public class ReplayUtility : IReplayCommits
{
private readonly IStoreEvents _storeEvents;
private readonly IEventPublisher _eventPublisher;
public ReplayUtility()
: this(DI.Current.Resolve<IStoreEvents>(), DI.Current.Resolve<IEventPublisher>())
{ }
public ReplayUtility(IStoreEvents storeEvents, IEventPublisher eventPublisher)
{
_storeEvents = storeEvents;
_eventPublisher = eventPublisher;
}
public void Replay(DateTime start)
{
var commits = _storeEvents.Advanced.GetFrom(start);
Handle(commits);
}
public void Replay(Guid streamId, int minRevision, int maxRevision)
{
var commits = _storeEvents.Advanced.GetFrom(streamId, minRevision, maxRevision);
Handle(commits);
}
private void Handle(IEnumerable<Commit> commits)
{
// todo: optionally add a header, so consumers not leveraging gateways can determine whether or not to execute
// note: if optional header is added, create extension method IMessage.IsReplay() for ease of development
foreach (var commit in commits)
_eventPublisher.Publish(commit.Events.Select(e => e.Body), commit.Headers, commit.Events.ToDictionary(e => e.Body, e => e.Headers));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment