-
-
Save jen20/ec6cd45bac755979e16a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using EventStore.ClientAPI; | |
using EventStore.ClientAPI.Exceptions; | |
using NLog; | |
namespace DurableSubscriber | |
{ | |
public class NotYetDurableAllStreamsSubscriber | |
{ | |
private static readonly Logger Log = LogManager.GetLogger("DurableAllStreamsSubscriber"); | |
private const int ReadPageSize = 500; | |
private readonly EventStoreConnection _connection; | |
private readonly IKnowLastProcessedPosition _lastPosition; | |
private readonly Action<RecordedEvent, Position> _consumer; | |
private readonly ConcurrentQueue<ResolvedEvent> _pullQueue; | |
private readonly ConcurrentQueue<ResolvedEvent> _pushQueue; | |
private Position _lastPullPositionProcessed; | |
private readonly ManualResetEventSlim _isPulling; | |
public NotYetDurableAllStreamsSubscriber(EventStoreConnection connection, IKnowLastProcessedPosition lastPosition, Action<RecordedEvent, Position> consumer) | |
{ | |
_connection = connection; | |
_lastPosition = lastPosition; | |
_consumer = consumer; | |
_pullQueue = new ConcurrentQueue<ResolvedEvent>(); | |
_pushQueue = new ConcurrentQueue<ResolvedEvent>(); | |
_isPulling = new ManualResetEventSlim(true); | |
_lastPullPositionProcessed = new Position(0, 0); | |
} | |
public void Start() | |
{ | |
Task.Factory.StartNew(() => _connection.SubscribeToAll(true, EnqueuePushedEvent).ContinueWith(t => Log.Info("SUBSCRIBED"))) | |
.ContinueWith(t => ProcessPullQueue()) | |
.ContinueWith(t => ProcessPushQueue()); | |
Task.Factory.StartNew(() => PullAndEnqueueFromPositionExclusive(_lastPosition.GetLastProcessedPosition())); | |
} | |
private void ProcessPushQueue() | |
{ | |
while (true) | |
{ | |
ResolvedEvent current; | |
if (_pushQueue.TryDequeue(out current)) | |
{ | |
if (!current.OriginalPosition.HasValue) | |
throw new NoResultException("Was pushed event without a position"); | |
if (current.OriginalPosition.Value > _lastPullPositionProcessed) | |
_consumer(current.Event, current.OriginalPosition.Value); | |
else | |
Log.Warn("Throwing Duplicate Away: ({0}, {1}) {2} {3}", current.OriginalPosition.Value.CommitPosition, current.OriginalPosition.Value.PreparePosition, current.OriginalStreamId, current.OriginalEvent); | |
} | |
else | |
{ | |
Thread.Sleep(1); | |
} | |
} | |
} | |
private void EnqueuePushedEvent(ResolvedEvent pulledEvent) | |
{ | |
_pushQueue.Enqueue(pulledEvent); | |
} | |
private void ProcessPullQueue() | |
{ | |
while (_isPulling.IsSet || _pullQueue.Count > 0) | |
{ | |
ResolvedEvent current; | |
if (_pullQueue.TryDequeue(out current)) | |
{ | |
if (!current.OriginalPosition.HasValue) | |
throw new NoResultException("Pulled event without a position"); | |
_consumer(current.Event, current.OriginalPosition.Value); | |
_lastPullPositionProcessed = current.OriginalPosition.Value; | |
} | |
else | |
{ | |
Thread.Sleep(1); | |
} | |
} | |
} | |
private void PullAndEnqueueFromPositionExclusive(Position fromPositionExclusive) | |
{ | |
AllEventsSlice slice; | |
var nextPosition = fromPositionExclusive; | |
do | |
{ | |
slice = _connection.ReadAllEventsForward(nextPosition, ReadPageSize, true); | |
nextPosition = slice.NextPosition; | |
foreach (var e in slice.Events) | |
{ | |
if (e.OriginalPosition > fromPositionExclusive) | |
_pullQueue.Enqueue(e); | |
} | |
} while (slice.Events.Length != 0); | |
_isPulling.Reset(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment