Skip to content

Instantly share code, notes, and snippets.

Last active December 16, 2015 05:19
Show Gist options
  • Save philcleveland/5383804 to your computer and use it in GitHub Desktop.
Save philcleveland/5383804 to your computer and use it in GitHub Desktop.
GetEventStore Event Dispatcher EventStore.ClientAPI compatable v1.1.0
//Special thanks to Andrii Nakryiko and James Nugent
//for their help with this code.
namespace Infrastructure.EventStorage
using System;
using System.Collections.Concurrent;
using System.Net;
using System.Text;
using System.Threading;
using EventStore.ClientAPI;
using Infrastructure.Messaging.Events;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
/// <summary>
/// Event Dispatcher for GetEventStore's
/// Compatable with EventStore.ClientAPI v1.1.0
/// </summary>
public class GetEventStoreEventDispatcher
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000;
private readonly IEventBus _eventBus;
private EventStoreConnection _connection;
private readonly IPEndPoint _eventStoreTcpEndpoint;
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>();
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true);
private volatile bool _stop;
private int _isPublishing;
private Position _lastProcessed;
private EventStoreAllCatchUpSubscription _subscription;
private volatile bool _isEventStoreConnected;
private readonly IPersistGetEventStorePosition _positionRepository;
public GetEventStoreEventDispatcher(IPEndPoint eventStoreTcpEndpoint, IEventBus eventBus, IPersistGetEventStorePosition positionRepository)
if (eventBus == null) throw new ArgumentNullException("eventBus");
if (positionRepository == null) throw new ArgumentNullException("positionRepository");
var connSettings = ConnectionSettings.Create()
_connection = EventStoreConnection.Create(connSettings);
_eventStoreTcpEndpoint = eventStoreTcpEndpoint;
_eventBus = eventBus;
_positionRepository = positionRepository;
_lastProcessed = _positionRepository.GetLastProcessedPosition();
public void StartDispatching()
public void StopDispatching()
_stop = true;
if (_subscription != null)
_subscription.Stop(new TimeSpan());
throw new TimeoutException("Unable to stop dispatching in time.");
private void HandleGetEventStoreConnectionError(EventStoreConnection connection, Exception ex)
//TODO: log the error on a passed in ILogger
private void HandleGetEventStoreConnected(EventStoreConnection connection)
_isEventStoreConnected = true;
_connection = connection; //TODO: not sure if I need to do this
if (_subscription == null)
private void HandleGetEventStoreDisconnected(EventStoreConnection connection)
_connection = connection; //TODO: not sure if I need to do this
_isEventStoreConnected = false;
private void HandleSubscriptionDropped(EventStoreCatchUpSubscription subscription, string reason, Exception error)
private void RecoverSubscription()
if (!_isEventStoreConnected)
_subscription = _connection.SubscribeToAllFrom(_lastProcessed, false, HandleNewEvent, HandleSubscriptionDropped);
private void HandleNewEvent(EventStoreCatchUpSubscription subscription, ResolvedEvent @event)
EnsurePublishEvents(_liveQueue, _liveDone);
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
if (_stop) return;
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent));
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
bool keepGoing = true;
while (keepGoing)
doneEvent.Reset(); // signal we start processing this queue
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :)
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
ResolvedEvent evnt;
while (!_stop && queue.TryDequeue(out evnt))
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice
var processedEvent = ProcessRawEvent(evnt);
if (processedEvent != null)
_lastProcessed = evnt.OriginalPosition.Value;
doneEvent.Set(); // signal end of processing particular queue
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
// try to reacquire lock if needed
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0;
private IEvent ProcessRawEvent(ResolvedEvent rawEvent)
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0)
var @event = DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data);
return @event as IEvent;
return null;
/// <summary>
/// Deserializes the event from the raw GetEventStore event to my event.
/// Took this from a gist that James Nugent posted on the GetEventStore forumns.
/// </summary>
/// <param name="metadata"></param>
/// <param name="data"></param>
/// <returns></returns>
private static object DeserializeEvent(byte[] metadata, byte[] data)
const string EventClrTypeHeader = "EventClrTypeName";
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value;
return JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
Copy link


What changed from the old one

Kind and regards.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment