Skip to content

Instantly share code, notes, and snippets.

@philcleveland
Last active June 8, 2016 23:04
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save philcleveland/4744120 to your computer and use it in GitHub Desktop.
Save philcleveland/4744120 to your computer and use it in GitHub Desktop.
Event dispatcher which receives events from the GetEventStore after they are saved. It takes the saved events and publishes them to the passed in Event Bus. This ensures that events are not published until they are saved in the GetEventStore. Big thanks to Andrii for all the reviews and coding help to get this thing working.
public class GetEventStoreEventDispatcher
{
private const int RECONNECT_TIMEOUT_MILLISEC = 5000;
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000;
private const int READ_PAGE_SIZE = 500;
private const int LIVE_QUEUE_SIZE_LIMIT = 10000;
private readonly IEventBus _eventBus;
private readonly EventStoreConnection _store;
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>();
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true);
private readonly ConcurrentQueue<ResolvedEvent> _historicalQueue = new ConcurrentQueue<ResolvedEvent>();
private readonly ManualResetEventSlim _historicalDone = new ManualResetEventSlim(true);
private volatile bool _stop;
private volatile bool _livePublishingAllowed;
private int _isPublishing;
private Position _lastProcessed;
private EventStoreSubscription _subscription;
public GetEventStoreEventDispatcher(EventStoreConnection store, IEventBus eventBus)
{
if (store == null) throw new ArgumentNullException("store");
if (eventBus == null) throw new ArgumentNullException("eventBus");
_store = store;
_eventBus = eventBus;
_lastProcessed = new Position(-1, -1);
}
// Credit algorithm to Szymon Pobiega
// http://simon-says-architecture.com/2013/02/02/mechanics-of-durable-subscription/#comments
// 1. The subscriber always starts with pull assuming there were some messages generated while it was offline
// 2. The subscriber pulls messages until there’s nothing left to pull (it is up to date with the stream)
// 3. Push subscription is started but arriving messages are not processed immediately but temporarily redirected to a buffer
// 4. One last pull is done to ensure nothing happened between step 2 and 3
// 5. Messages from this last pull are processed
// 6. Processing messages from push buffer is started. While messages are processed, they are checked against IDs of messages processed in step 5 to ensure there’s no duplicates.
// 7. System works in push model until subscriber is killed or subscription is dropped by publisher drops push subscription.
//Credit to Andrii Nakryiko
//If data is written to storage at such a speed, that between the moment you did your last
//pull read and the moment you subscribed to push notifications more data (events) were
//generated, than you request in one pull request, you would need to repeat steps 4-5 few
//times until you get a pull message which position is >= subscription position
//(EventStore provides you with those positions).
public void StartDispatching()
{
RecoverSubscription();
}
private void RecoverSubscription()
{
_livePublishingAllowed = false;
_liveDone.Wait(); // wait until all live processing is finished (queue is empty, _lastProcessed updated)
//AN: if _lastProcessed == (-1, -1) then we haven't processed anything yet, so we start from Position.Start
var startPos = _lastProcessed == new Position(-1, -1) ? Position.Start : _lastProcessed;
var nextPos = ReadHistoricalEventsFrom(startPos);
_subscription = SubscribeToAll();
ReadHistoricalEventsFrom(nextPos);
_historicalDone.Wait(); // wait until historical queue is empty and _lastProcessed updated
_livePublishingAllowed = true;
EnsurePublishEvents(_liveQueue, _liveDone);
}
public void StopDispatching()
{
_stop = true;
if (_subscription != null)
_subscription.Unsubscribe();
// hopefully additional check in PublishEvents (additional check for _stop after setting event) prevents race conditions
if (!_historicalDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC))
throw new TimeoutException("Unable to stop dispatching in time.");
if (!_liveDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC))
throw new TimeoutException("Unable to stop dispatching in time.");
}
private Position ReadHistoricalEventsFrom(Position from)
{
var position = from;
AllEventsSlice slice;
while (!_stop && (slice = _store.ReadAllEventsForward(position, READ_PAGE_SIZE, false)).Events.Length > 0)
{
foreach (var rawEvent in slice.Events)
{
_historicalQueue.Enqueue(rawEvent);
}
EnsurePublishEvents(_historicalQueue, _historicalDone);
position = slice.NextPosition;
}
return position;
}
private EventStoreSubscription SubscribeToAll()
{
//TODO: Before trying to resubscribe - how to ensure that store is active and ready to accept.
//AN: EventStoreConnection automatically tries to connect (if not already connected) to EventStore,
//so you don't have to do something manually
//Though in case of errors, you need to do some actions (if EventStore server is down or not yet up, etc)
var task = _store.SubscribeToAll(false, HandleEventAppeared, HandleSubscriptionDropped);
if (!task.Wait(RECONNECT_TIMEOUT_MILLISEC))
throw new TimeoutException("Could not reconnect after the subscription was dropped");
return task.Result;
}
private void HandleEventAppeared(ResolvedEvent rawEvent)
{
if (_stop) return;
_liveQueue.Enqueue(rawEvent);
//Prevent live queue memory explosion.
if (!_livePublishingAllowed && _liveQueue.Count > LIVE_QUEUE_SIZE_LIMIT)
{
ResolvedEvent throwAwayEvent;
_liveQueue.TryDequeue(out throwAwayEvent);
}
if (_livePublishingAllowed)
EnsurePublishEvents(_liveQueue, _liveDone);
}
private void HandleSubscriptionDropped()
{
if (_stop) return;
RecoverSubscription();
}
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
{
if (_stop) return;
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent));
}
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
{
bool keepGoing = true;
while (keepGoing)
{
doneEvent.Reset(); // signal we start processing this queue
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :)
{
doneEvent.Set();
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
return;
}
ResolvedEvent evnt;
while (!_stop && queue.TryDequeue(out evnt))
{
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice
{
var processedEvent = ProcessRawEvent(evnt);
if (processedEvent != null)
_eventBus.Publish(processedEvent);
_lastProcessed = evnt.OriginalPosition.Value;
}
}
doneEvent.Set(); // signal end of processing particular queue
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
// try to reacquire lock if needed
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0;
}
}
private IEvent ProcessRawEvent(ResolvedEvent rawEvent)
{
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0)
return DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data);
return null;
}
/// <summary>
/// Deserializes the event from the raw GetEventStore event to my event.
/// Took this from a gist that James Nugent posted on the GetEventStore forumns.
/// </summary>
/// <param name="metadata"></param>
/// <param name="data"></param>
/// <returns></returns>
private static IEvent DeserializeEvent(byte[] metadata, byte[] data)
{
const string EventClrTypeHeader = "EventClrTypeName";
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value;
return (IEvent)JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment