Forked from philcleveland/GetEventStoreEventDispatcher.cs
Last active
June 27, 2020 20:22
-
-
Save anakryiko/4968492 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Text; | |
using System.Threading; | |
using EventStore.ClientAPI; | |
using Newtonsoft.Json; | |
using Newtonsoft.Json.Linq; | |
namespace EventStoreEventDispatcher | |
{ | |
public interface IEventBus | |
{ | |
void Publish(IEvent evnt); | |
} | |
public interface IEvent | |
{ | |
} | |
public class GetEventStoreEventDispatcher : IDisposable | |
{ | |
private const int RECONNECT_TIMEOUT_MILLISEC = 5000; | |
private const int THREAD_KILL_TIMEOUT_MILLISEC = 5000; | |
private const int READ_PAGE_SIZE = 500; | |
private readonly IEventBus _eventBus; | |
private readonly EventStoreConnection _store; | |
private readonly ConcurrentQueue<ResolvedEvent> _liveQueue = new ConcurrentQueue<ResolvedEvent>(); | |
private readonly ManualResetEventSlim _liveDone = new ManualResetEventSlim(true); | |
private readonly ConcurrentQueue<ResolvedEvent> _historicalQueue = new ConcurrentQueue<ResolvedEvent>(); | |
private readonly ManualResetEventSlim _historicalDone = new ManualResetEventSlim(true); | |
private volatile bool _stop; | |
private volatile bool _livePublishingAllowed; | |
private int _isPublishing; | |
private Position _lastProcessed; | |
private EventStoreSubscription _subscription; | |
public GetEventStoreEventDispatcher(EventStoreConnection store, IEventBus eventBus) | |
{ | |
if (store == null) throw new ArgumentNullException("store"); | |
if (eventBus == null) throw new ArgumentNullException("eventBus"); | |
_store = store; | |
_eventBus = eventBus; | |
_lastProcessed = new Position(-1, -1); | |
} | |
// Credit algorithm to Szymon Pobiega | |
// http://simon-says-architecture.com/2013/02/02/mechanics-of-durable-subscription/#comments | |
// 1. The subscriber always starts with pull assuming there were some messages generated while it was offline | |
// 2. The subscriber pulls messages until there’s nothing left to pull (it is up to date with the stream) | |
// 3. Push subscription is started but arriving messages are not processed immediately but temporarily redirected to a buffer | |
// 4. One last pull is done to ensure nothing happened between step 2 and 3 | |
// 5. Messages from this last pull are processed | |
// 6. Processing messages from push buffer is started. While messages are processed, they are checked against IDs of messages processed in step 5 to ensure there’s no duplicates. | |
// 7. System works in push model until subscriber is killed or subscription is dropped by publisher drops push subscription. | |
//Credit to Andrii Nakryiko | |
//If data is written to storage at such a speed, that between the moment you did your last | |
//pull read and the moment you subscribed to push notifications more data (events) were | |
//generated, than you request in one pull request, you would need to repeat steps 4-5 few | |
//times until you get a pull message which position is >= subscription position | |
//(EventStore provides you with those positions). | |
public void StartDispatching() | |
{ | |
// AN: not sure why you needed all this explicit parallel threads and synchronization, | |
// if you block in this method on each step using ManualResetEventSlim?.. | |
RecoverSubscription(); | |
} | |
private void RecoverSubscription() | |
{ | |
_livePublishingAllowed = false; | |
_liveDone.Wait(); // wait until all live processing is finished (queue is empty, _lastProcessed updated) | |
//AN: if _lastProcessed == (-1, -1) then we haven't processed anything yet, so we start from Position.Start | |
var startPos = _lastProcessed == new Position(-1, -1) ? Position.Start : _lastProcessed; | |
var nextPos = ReadHistoricalEventsFrom(startPos); | |
_subscription = SubscribeToAll(); | |
ReadHistoricalEventsFrom(nextPos); | |
_historicalDone.Wait(); // wait until historical queue is empty and _lastProcessed updated | |
_livePublishingAllowed = true; | |
EnsurePublishEvents(_liveQueue, _liveDone); | |
} | |
public void StopDispatching() | |
{ | |
_stop = true; | |
if (_subscription != null) | |
_subscription.Unsubscribe(); | |
// hopefully additional check in PublishEvents (additional check for _stop after setting event) prevents race conditions | |
if (!_historicalDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Unable to stop dispatching in time."); | |
if (!_liveDone.Wait(THREAD_KILL_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Unable to stop dispatching in time."); | |
} | |
private Position ReadHistoricalEventsFrom(Position from) | |
{ | |
var position = from; | |
AllEventsSlice slice; | |
while (!_stop && (slice = _store.ReadAllEventsForward(position, READ_PAGE_SIZE, false)).Events.Length > 0) | |
{ | |
foreach (var rawEvent in slice.Events) | |
{ | |
_historicalQueue.Enqueue(rawEvent); | |
} | |
EnsurePublishEvents(_historicalQueue, _historicalDone); | |
position = slice.NextPosition; | |
} | |
return position; | |
} | |
private EventStoreSubscription SubscribeToAll() | |
{ | |
//TODO: Before trying to resubscribe - how to ensure that store is active and ready to accept. | |
//AN: EventStoreConnection automatically tries to connect (if not already connected) to EventStore, | |
//so you don't have to do something manually | |
//Though in case of errors, you need to do some actions (if EventStore server is down or not yet up, etc) | |
var task = _store.SubscribeToAll(false, HandleEventAppeared, HandleSubscriptionDropped); | |
if (!task.Wait(RECONNECT_TIMEOUT_MILLISEC)) | |
throw new TimeoutException("Could not reconnect after the subscription was dropped"); | |
return task.Result; | |
} | |
private void HandleEventAppeared(ResolvedEvent rawEvent) | |
{ | |
if (_stop) return; | |
_liveQueue.Enqueue(rawEvent); | |
if (_livePublishingAllowed) | |
EnsurePublishEvents(_liveQueue, _liveDone); | |
} | |
private void HandleSubscriptionDropped() | |
{ | |
if (_stop) return; | |
RecoverSubscription(); | |
} | |
private void EnsurePublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent) | |
{ | |
if (_stop) return; | |
if (Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0) | |
ThreadPool.QueueUserWorkItem(_ => PublishEvents(queue, doneEvent)); | |
} | |
private void PublishEvents(ConcurrentQueue<ResolvedEvent> queue, ManualResetEventSlim doneEvent) | |
{ | |
bool keepGoing = true; | |
while (keepGoing) | |
{ | |
doneEvent.Reset(); // signal we start processing this queue | |
if (_stop) // this is to avoid race condition in StopDispatching, though it is 1AM here, so I could be wrong :) | |
{ | |
doneEvent.Set(); | |
Interlocked.CompareExchange(ref _isPublishing, 0, 1); | |
return; | |
} | |
ResolvedEvent evnt; | |
while (!_stop && queue.TryDequeue(out evnt)) | |
{ | |
if (evnt.OriginalPosition > _lastProcessed) // this ensures we don't process same events twice | |
{ | |
var processedEvent = ProcessRawEvent(evnt); | |
if (processedEvent != null) | |
_eventBus.Publish(processedEvent); | |
_lastProcessed = evnt.OriginalPosition.Value; | |
} | |
} | |
doneEvent.Set(); // signal end of processing particular queue | |
Interlocked.CompareExchange(ref _isPublishing, 0, 1); | |
// try to reacquire lock if needed | |
keepGoing = !_stop && queue.Count > 0 && Interlocked.CompareExchange(ref _isPublishing, 1, 0) == 0; | |
} | |
} | |
private IEvent ProcessRawEvent(ResolvedEvent rawEvent) | |
{ | |
if (rawEvent.OriginalEvent.Metadata.Length > 0 && rawEvent.OriginalEvent.Data.Length > 0) | |
return DeserializeEvent(rawEvent.OriginalEvent.Metadata, rawEvent.OriginalEvent.Data); | |
return null; | |
} | |
/// <summary> | |
/// Deserializes the event from the raw GetEventStore event to my event. | |
/// Took this from a gist that James Nugent posted on the GetEventStore forumns. | |
/// </summary> | |
/// <param name="metadata"></param> | |
/// <param name="data"></param> | |
/// <returns></returns> | |
private static IEvent DeserializeEvent(byte[] metadata, byte[] data) | |
{ | |
const string EventClrTypeHeader = "EventClrTypeName"; | |
var eventClrTypeName = JObject.Parse(Encoding.UTF8.GetString(metadata)).Property(EventClrTypeHeader).Value; | |
return (IEvent)JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data), Type.GetType((string)eventClrTypeName)); | |
} | |
public void Dispose() | |
{ | |
// AN: you haven't called _store.Connect, so I suppose something passed to you initialized (connected) | |
// EventStoreConnection. So let somewhere on different level manage EventStoreConnection, as connection | |
// could be shared among many parts and disposing dispatcher doesn't mean you want to close connection. | |
//_store.Close(); | |
// this class doesn't have finalizer, why do you need all this complexities of disposal and GC.SuppressFinalize ? | |
//GC.SuppressFinalize(this); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hey Andrii. I just added some code to prevent the live queue from growing very large in my gist.
//Prevent live queue memory explosion.
if (!_livePublishingAllowed && _liveQueue.Count > LIVE_QUEUE_SIZE_LIMIT)
{
ResolvedEvent throwAwayEvent;
_liveQueue.TryDequeue(out throwAwayEvent);
}