Created
January 6, 2015 19:52
-
-
Save fschmied/418e38425f83fb81b833 to your computer and use it in GitHub Desktop.
Catch-up subscription
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EventSubscriber : IEventSubscriber | |
{ | |
private IReadModelPersistence _persistence; | |
public void CatchUpAndSubscribe (IEventBus eventBus, IEventStore eventStore) | |
{ | |
CatchUpUntilNow(eventStore); | |
eventBus.Subscribe (this); | |
CatchUpUntilNow(eventStore); | |
} | |
private void CatchUpUntilNow (IEventStore eventStore) | |
{ | |
while (true) | |
{ | |
using (var tx = new TransactionScope()) | |
{ | |
// In GetLastHandledCheckpoint, make sure the transaction takes and holds a lock on the lastHandledCheckpoint data. | |
// That way, concurrent "Receive" calls will not interfere. | |
var lastHandledCheckpoint = _persistence.GetLastHandledCheckpoint(); | |
// Use push model to get event batches. | |
var events = eventStore.GetNextEventBatch (from: lastHandledCheckpoint + 1); | |
if (!events.Any()) | |
break; | |
foreach (var evt in events) | |
{ | |
RunEventHandlers (evt); | |
lastHandledCheckpoint = evt.Checkpoint; | |
} | |
_persistence.SaveLastHandledCheckpoint(); | |
tx.Complete(); | |
} | |
} | |
} | |
public void Receive(Event evt) | |
{ | |
// Save last handled checkpoint in the same transaction used to update read model. | |
using (var tx = new TransactionScope()) | |
{ | |
// In GetLastHandledCheckpoint, make sure the transaction takes and holds a lock on the lastHandledCheckpoint data. | |
// That way, concurrent "CatchUpUntilNow" calls will not interfere. | |
var lastHandledCheckpoint = _persistence.GetLastHandledCheckpoint(); | |
if (evt.Checkpoint <= lastHandledCheckpoint) | |
return; // Duplicate event, ignore | |
else if (evt.Checkpoint == lastHandledCheckpoint + 1) | |
Handle(evt); // In order event, handle | |
else // evt.Checkpoint > lastHandledCheckpoint + 1 | |
_persistence.SaveEventToHoldingBay (evt); // Out-of-order event, save for later | |
tx.Complete(); | |
} | |
} | |
private void Handle (Event evt) | |
{ | |
var lastHandledCheckpoint; | |
do | |
{ | |
RunEventHandlers (evt); | |
lastHandledCheckpoint = evt.Checkpoint; | |
// Check if the persistence already holds the next event (if it came in out-of-order earlier) | |
evt = _persistence.GetNextEventFromHoldingBay(lastHandledCheckpoint); | |
} while (evt != null); | |
_persistence.WriteLastHandledCheckpoint (lastHandledCheckpoint); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This Gist just demonstrates an idea for the NEventStore mailing list. I wrote it from the top of my head and I make no promises about whether it works at all.