-
-
Save nmehlei/71c63b103448562bbb74 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private void EventAppeared(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent) | |
{ | |
Logger.Info("VolatileProjectionModule \"{0}\": EventAppeared with eventNumber: {1}, Thread ID: {2}", _contextKey, resolvedEvent.Link.EventNumber, | |
Thread.CurrentThread.ManagedThreadId); | |
lock (_processingLock) | |
{ | |
if (_subscription != eventStoreCatchUpSubscription) | |
{ | |
Logger.Warn("VolatileProjectionModule \"{0}\": Event appeared over old subscription, skipping processing and stopping old subscription", _contextKey); | |
QueueSubscriptionStop(eventStoreCatchUpSubscription); | |
return; | |
} | |
Event evt = null; | |
var expectedVersion = _previousEventNumber + 1; | |
var receivedVersion = resolvedEvent.Link.EventNumber; | |
Logger.Verbose("VolatileProjectionModule \"{0}\": Event appeared of type \"{1}\", received: {2}, expected: {3}, used subscription: {4}, Thread ID: {5}", | |
_contextKey, resolvedEvent.Event.EventType, receivedVersion, expectedVersion, eventStoreCatchUpSubscription.GetHashCode(), Thread.CurrentThread.ManagedThreadId); | |
try | |
{ | |
// check if new event number is the next from the previous, so nothing gets missed | |
if (receivedVersion < expectedVersion) | |
{ | |
Logger.Warn("VolatileProjectionModule \"{0}\": Skipping already processed event with version {1}, expected {2}", | |
_contextKey, receivedVersion, expectedVersion); | |
return; // idempotency | |
} | |
if (receivedVersion > expectedVersion) | |
throw new EventMissedException(expectedVersion, receivedVersion); | |
using (var ms = new MemoryStream(resolvedEvent.Event.Data)) | |
{ | |
_viewStore.Begin(receivedVersion); | |
evt = Serializer.DeserializeFromStream<Event>(ms); | |
Logger.Verbose("VolatileProjectionModule \"{0}\": Handling event of type {1} with version {2}, used subscription: {3}", | |
_contextKey, evt.GetType().Name, receivedVersion, eventStoreCatchUpSubscription.GetHashCode()); | |
_projectionManager.Handle(evt); | |
_viewStore.Commit(_publishViewUpdates); | |
} | |
ProcessedEvents++; | |
_previousEventNumber = receivedVersion; | |
Logger.Info("VolatileProjectionModule \"{0}\": Handled event of type {1} with version {2}, used subscription: {3}, Thread ID: {4}", | |
_contextKey, evt.GetType().Name, receivedVersion, eventStoreCatchUpSubscription.GetHashCode(), Thread.CurrentThread.ManagedThreadId); | |
} | |
catch (EventMissedException exc) | |
{ | |
Logger.Error("VolatileProjectionModule \"{0}\": Error during projection processing, missed version, received: {1}, expected: {2}, previous: {3}", | |
_contextKey, exc.ReceivedVersion, exc.ExpectedVersion, _previousEventNumber); | |
throw; | |
} | |
catch (Exception exc) | |
{ | |
Logger.Error("VolatileProjectionModule \"{0}\": Error during projection processing of version {1}, waiting 10sec, event: {2}, exception: {3}", | |
_contextKey, receivedVersion, Serializer.FormatJson(evt), exc); | |
Thread.Sleep(TimeSpan.FromSeconds(10)); | |
throw; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment