Skip to content

Instantly share code, notes, and snippets.

@fschmied
Created January 6, 2015 19:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fschmied/418e38425f83fb81b833 to your computer and use it in GitHub Desktop.
Save fschmied/418e38425f83fb81b833 to your computer and use it in GitHub Desktop.
Catch-up subscription
public class EventSubscriber : IEventSubscriber
{
private IReadModelPersistence _persistence;
public void CatchUpAndSubscribe (IEventBus eventBus, IEventStore eventStore)
{
CatchUpUntilNow(eventStore);
eventBus.Subscribe (this);
CatchUpUntilNow(eventStore);
}
private void CatchUpUntilNow (IEventStore eventStore)
{
while (true)
{
using (var tx = new TransactionScope())
{
// In GetLastHandledCheckpoint, make sure the transaction takes and holds a lock on the lastHandledCheckpoint data.
// That way, concurrent "Receive" calls will not interfere.
var lastHandledCheckpoint = _persistence.GetLastHandledCheckpoint();
// Use push model to get event batches.
var events = eventStore.GetNextEventBatch (from: lastHandledCheckpoint + 1);
if (!events.Any())
break;
foreach (var evt in events)
{
RunEventHandlers (evt);
lastHandledCheckpoint = evt.Checkpoint;
}
_persistence.SaveLastHandledCheckpoint();
tx.Complete();
}
}
}
public void Receive(Event evt)
{
// Save last handled checkpoint in the same transaction used to update read model.
using (var tx = new TransactionScope())
{
// In GetLastHandledCheckpoint, make sure the transaction takes and holds a lock on the lastHandledCheckpoint data.
// That way, concurrent "CatchUpUntilNow" calls will not interfere.
var lastHandledCheckpoint = _persistence.GetLastHandledCheckpoint();
if (evt.Checkpoint <= lastHandledCheckpoint)
return; // Duplicate event, ignore
else if (evt.Checkpoint == lastHandledCheckpoint + 1)
Handle(evt); // In order event, handle
else // evt.Checkpoint > lastHandledCheckpoint + 1
_persistence.SaveEventToHoldingBay (evt); // Out-of-order event, save for later
tx.Complete();
}
}
private void Handle (Event evt)
{
var lastHandledCheckpoint;
do
{
RunEventHandlers (evt);
lastHandledCheckpoint = evt.Checkpoint;
// Check if the persistence already holds the next event (if it came in out-of-order earlier)
evt = _persistence.GetNextEventFromHoldingBay(lastHandledCheckpoint);
} while (evt != null);
_persistence.WriteLastHandledCheckpoint (lastHandledCheckpoint);
}
}
@fschmied
Copy link
Author

fschmied commented Jan 6, 2015

This Gist just demonstrates an idea for the NEventStore mailing list. I wrote it from the top of my head and I make no promises about whether it works at all.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment