Skip to content

Instantly share code, notes, and snippets.

@nmehlei
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nmehlei/71c63b103448562bbb74 to your computer and use it in GitHub Desktop.
Save nmehlei/71c63b103448562bbb74 to your computer and use it in GitHub Desktop.
private void EventAppeared(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent)
{
Logger.Info("VolatileProjectionModule \"{0}\": EventAppeared with eventNumber: {1}, Thread ID: {2}", _contextKey, resolvedEvent.Link.EventNumber,
Thread.CurrentThread.ManagedThreadId);
lock (_processingLock)
{
if (_subscription != eventStoreCatchUpSubscription)
{
Logger.Warn("VolatileProjectionModule \"{0}\": Event appeared over old subscription, skipping processing and stopping old subscription", _contextKey);
QueueSubscriptionStop(eventStoreCatchUpSubscription);
return;
}
Event evt = null;
var expectedVersion = _previousEventNumber + 1;
var receivedVersion = resolvedEvent.Link.EventNumber;
Logger.Verbose("VolatileProjectionModule \"{0}\": Event appeared of type \"{1}\", received: {2}, expected: {3}, used subscription: {4}, Thread ID: {5}",
_contextKey, resolvedEvent.Event.EventType, receivedVersion, expectedVersion, eventStoreCatchUpSubscription.GetHashCode(), Thread.CurrentThread.ManagedThreadId);
try
{
// check if new event number is the next from the previous, so nothing gets missed
if (receivedVersion < expectedVersion)
{
Logger.Warn("VolatileProjectionModule \"{0}\": Skipping already processed event with version {1}, expected {2}",
_contextKey, receivedVersion, expectedVersion);
return; // idempotency
}
if (receivedVersion > expectedVersion)
throw new EventMissedException(expectedVersion, receivedVersion);
using (var ms = new MemoryStream(resolvedEvent.Event.Data))
{
_viewStore.Begin(receivedVersion);
evt = Serializer.DeserializeFromStream<Event>(ms);
Logger.Verbose("VolatileProjectionModule \"{0}\": Handling event of type {1} with version {2}, used subscription: {3}",
_contextKey, evt.GetType().Name, receivedVersion, eventStoreCatchUpSubscription.GetHashCode());
_projectionManager.Handle(evt);
_viewStore.Commit(_publishViewUpdates);
}
ProcessedEvents++;
_previousEventNumber = receivedVersion;
Logger.Info("VolatileProjectionModule \"{0}\": Handled event of type {1} with version {2}, used subscription: {3}, Thread ID: {4}",
_contextKey, evt.GetType().Name, receivedVersion, eventStoreCatchUpSubscription.GetHashCode(), Thread.CurrentThread.ManagedThreadId);
}
catch (EventMissedException exc)
{
Logger.Error("VolatileProjectionModule \"{0}\": Error during projection processing, missed version, received: {1}, expected: {2}, previous: {3}",
_contextKey, exc.ReceivedVersion, exc.ExpectedVersion, _previousEventNumber);
throw;
}
catch (Exception exc)
{
Logger.Error("VolatileProjectionModule \"{0}\": Error during projection processing of version {1}, waiting 10sec, event: {2}, exception: {3}",
_contextKey, receivedVersion, Serializer.FormatJson(evt), exc);
Thread.Sleep(TimeSpan.FromSeconds(10));
throw;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment