Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
public class GetEventStoreSubscriptionDispatcher
{
private const string EventClrTypeHeader = "EventClrTypeName";
private readonly IBus bus;
private readonly IEventStoreConnection eventStoreConnection;
private readonly MongoDatabase mongoDatabase;
private static readonly JsonSerializerSettings SerializerSettings;
static GetEventStoreSubscriptionDispatcher()
{
SerializerSettings = new JsonSerializerSettings {TypeNameHandling = TypeNameHandling.None};
}
public GetEventStoreSubscriptionDispatcher(IBus bus, IEventStoreConnection eventStoreConnection, MongoServer mongoServer)
{
this.bus = bus;
this.eventStoreConnection = eventStoreConnection;
mongoDatabase = mongoServer.GetDatabase(DbConfig.ReadModelStoreDatabase);
}
public void SubscribeToEventsForHandlerOfType<T>() where T : IHandlesEvent
{
var eventsToSubscribe = from handler in typeof(T).GetInterfaces()
where handler.IsGenericType &&
handler.GetGenericTypeDefinition() == typeof (IHandlesEvent<>)
select handler.GetGenericArguments().First();
var readModelVersions = mongoDatabase.GetCollection<ReadModelVersion>();
foreach (var @event in eventsToSubscribe)
{
var storeVersionRecord = readModelVersions.FindOne(Query<ReadModelVersion>.EQ(v => v.Id, @event.Name));
var storeVersion = storeVersionRecord != null ? new int?(storeVersionRecord.Version) : null;
var eventStream = string.Concat("$et-", @event.Name);
try
{
Debug.WriteLine("Subscribe to Events from Stream " + eventStream + " from position " + (storeVersion ?? 0));
eventStoreConnection.SubscribeToStreamFrom(eventStream, storeVersion, true,
EventAppeared,
LiveProcessingStarted,
SubscriptionDropped);
}
catch (Exception exception)
{
//...
}
}
}
private void SubscriptionDropped(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, SubscriptionDropReason subscriptionDropReason, Exception arg3)
{
Debug.WriteLine("Dropped " + subscriptionDropReason + " on subscription " +
eventStoreCatchUpSubscription.StreamId + Environment.NewLine + arg3);
}
private void LiveProcessingStarted(EventStoreCatchUpSubscription eventStoreCatchUpSubscription)
{
Debug.WriteLine("Live processing of " + eventStoreCatchUpSubscription.StreamId + " started..");
}
private void EventAppeared(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent)
{
var eventData = Encoding.UTF8.GetString(resolvedEvent.Event.Data);
var eventMetaData = Encoding.UTF8.GetString(resolvedEvent.Event.Metadata);
var eventHeaders = JsonConvert.DeserializeObject<IDictionary<string, object>>(eventMetaData,
SerializerSettings);
var eventType = Type.GetType(eventHeaders[EventClrTypeHeader].ToString());
if (eventType == null)
return;
var @event = JsonConvert.DeserializeObject(eventData, eventType, SerializerSettings);
Debug.WriteLine("Handle Event of Type " + eventType.Name);
var readModelVersions = mongoDatabase.GetCollection<ReadModelVersion>();
var storeVersionRecord = readModelVersions.FindOne(Query<ReadModelVersion>.EQ(v => v.Id, eventType.Name)) ??
new ReadModelVersion { Id = eventType.Name };
//use event number from $et-<Event> stream, not from aggregate
storeVersionRecord.Version = resolvedEvent.OriginalEventNumber;
var method = bus.GetType().GetMethod("Publish").MakeGenericMethod(eventType);
method.Invoke(bus, new[] { @event });
// persist event id after successful event handling
readModelVersions.Save(storeVersionRecord);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment