Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save anakryiko/4968492 to your computer and use it in GitHub Desktop.
Save anakryiko/4968492 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Text;
using System.Threading;
using EventStore.ClientAPI;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
namespace EventStoreEventDispatcher
{
public interface IEventBus
{
void Publish(IEvent evnt);
}
public interface IEvent
{
}
public class GetEventStoreEventDispatcher : IDisposable
{
private const int RECONNECT_TIMEOUT_MILLISEC = 5000;
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000;
private const int READ_PAGE_SIZE = 500;
private readonly IEventBus _eventBus;
private readonly EventStoreConnection _store;
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>();
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true);
private readonly ConcurrentQueue<ResolvedEvent> _historicalQueue = new ConcurrentQueue<ResolvedEvent>();
private readonly ManualResetEventSlim _historicalDone = new ManualResetEventSlim(true);
private volatile bool _stop;
private volatile bool _livePublishingAllowed;
private int _isPublishing;
private Position _lastProcessed;
private EventStoreSubscription _subscription;
public GetEventStoreEventDispatcher(EventStoreConnection store, IEventBus eventBus)
{
if (store == null) throw new ArgumentNullException("store");
if (eventBus == null) throw new ArgumentNullException("eventBus");
_store = store;
_eventBus = eventBus;
_lastProcessed = new Position(-1, -1);
}
// Credit algorithm to Szymon Pobiega
// http://simon-says-architecture.com/2013/02/02/mechanics-of-durable-subscription/#comments
// 1. The subscriber always starts with pull assuming there were some messages generated while it was offline
// 2. The subscriber pulls messages until there’s nothing left to pull (it is up to date with the stream)
// 3. Push subscription is started but arriving messages are not processed immediately but temporarily redirected to a buffer
// 4. One last pull is done to ensure nothing happened between step 2 and 3
// 5. Messages from this last pull are processed
// 6. Processing messages from push buffer is started. While messages are processed, they are checked against IDs of messages processed in step 5 to ensure there’s no duplicates.
// 7. System works in push model until subscriber is killed or subscription is dropped by publisher drops push subscription.
//Credit to Andrii Nakryiko
//If data is written to storage at such a speed, that between the moment you did your last
//pull read and the moment you subscribed to push notifications more data (events) were
//generated, than you request in one pull request, you would need to repeat steps 4-5 few
//times until you get a pull message which position is >= subscription position
//(EventStore provides you with those positions).
public void StartDispatching()
{
// AN: not sure why you needed all this explicit parallel threads and synchronization,
// if you block in this method on each step using ManualResetEventSlim?..
RecoverSubscription();
}
private void RecoverSubscription()
{
_livePublishingAllowed = false;
_liveDone.Wait(); // wait until all live processing is finished (queue is empty, _lastProcessed updated)
//AN: if _lastProcessed == (-1, -1) then we haven't processed anything yet, so we start from Position.Start
var startPos = _lastProcessed == new Position(-1, -1) ? Position.Start : _lastProcessed;
var nextPos = ReadHistoricalEventsFrom(startPos);
_subscription = SubscribeToAll();
ReadHistoricalEventsFrom(nextPos);
_historicalDone.Wait(); // wait until historical queue is empty and _lastProcessed updated
_livePublishingAllowed = true;
EnsurePublishEvents(_liveQueue, _liveDone);
}
public void StopDispatching()
{
_stop = true;
if (_subscription != null)
_subscription.Unsubscribe();
// hopefully additional check in PublishEvents (additional check for _stop after setting event) prevents race conditions
if (!_historicalDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC))
throw new TimeoutException("Unable to stop dispatching in time.");
if (!_liveDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC))
throw new TimeoutException("Unable to stop dispatching in time.");
}
private Position ReadHistoricalEventsFrom(Position from)
{
var position = from;
AllEventsSlice slice;
while (!_stop && (slice = _store.ReadAllEventsForward(position, READ_PAGE_SIZE, false)).Events.Length > 0)
{
foreach (var rawEvent in slice.Events)
{
_historicalQueue.Enqueue(rawEvent);
}
EnsurePublishEvents(_historicalQueue, _historicalDone);
position = slice.NextPosition;
}
return position;
}
private EventStoreSubscription SubscribeToAll()
{
//TODO: Before trying to resubscribe - how to ensure that store is active and ready to accept.
//AN: EventStoreConnection automatically tries to connect (if not already connected) to EventStore,
//so you don't have to do something manually
//Though in case of errors, you need to do some actions (if EventStore server is down or not yet up, etc)
var task = _store.SubscribeToAll(false, HandleEventAppeared, HandleSubscriptionDropped);
if (!task.Wait(RECONNECT_TIMEOUT_MILLISEC))
throw new TimeoutException("Could not reconnect after the subscription was dropped");
return task.Result;
}
private void HandleEventAppeared(ResolvedEvent rawEvent)
{
if (_stop) return;
_liveQueue.Enqueue(rawEvent);
if (_livePublishingAllowed)
EnsurePublishEvents(_liveQueue, _liveDone);
}
private void HandleSubscriptionDropped()
{
if (_stop) return;
RecoverSubscription();
}
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
{
if (_stop) return;
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent));
}
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent)
{
bool keepGoing = true;
while (keepGoing)
{
doneEvent.Reset(); // signal we start processing this queue
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :)
{
doneEvent.Set();
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
return;
}
ResolvedEvent evnt;
while (!_stop && queue.TryDequeue(out evnt))
{
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice
{
var processedEvent = ProcessRawEvent(evnt);
if (processedEvent != null)
_eventBus.Publish(processedEvent);
_lastProcessed = evnt.OriginalPosition.Value;
}
}
doneEvent.Set(); // signal end of processing particular queue
Interlocked.CompareExchange(ref _isPublishing, 0, 1);
// try to reacquire lock if needed
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0;
}
}
private IEvent ProcessRawEvent(ResolvedEvent rawEvent)
{
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0)
return DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data);
return null;
}
/// <summary>
/// Deserializes the event from the raw GetEventStore event to my event.
/// Took this from a gist that James Nugent posted on the GetEventStore forumns.
/// </summary>
/// <param name="metadata"></param>
/// <param name="data"></param>
/// <returns></returns>
private static IEvent DeserializeEvent(byte[] metadata, byte[] data)
{
const string EventClrTypeHeader = "EventClrTypeName";
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value;
return (IEvent)JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName));
}
public void Dispose()
{
// AN: you haven't called _store.Connect, so I suppose something passed to you initialized (connected)
// EventStoreConnection. So let somewhere on different level manage EventStoreConnection, as connection
// could be shared among many parts and disposing dispatcher doesn't mean you want to close connection.
//_store.Close();
// this class doesn't have finalizer, why do you need all this complexities of disposal and GC.SuppressFinalize ?
//GC.SuppressFinalize(this);
}
}
}
@philcleveland
Copy link

Hey Andrii. I just added some code to prevent the live queue from growing very large in my gist.

//Prevent live queue memory explosion.
if (!_livePublishingAllowed && _liveQueue.Count > LIVE_QUEUE_SIZE_LIMIT)
{
ResolvedEvent throwAwayEvent;
_liveQueue.TryDequeue(out throwAwayEvent);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment