Skip to content

Instantly share code, notes, and snippets.

@jen20
Created February 13, 2013 23:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save jen20/ec6cd45bac755979e16a to your computer and use it in GitHub Desktop.
Save jen20/ec6cd45bac755979e16a to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Exceptions;
using NLog;
namespace DurableSubscriber
{
public class NotYetDurableAllStreamsSubscriber
{
private static readonly Logger Log = LogManager.GetLogger("DurableAllStreamsSubscriber");
private const int ReadPageSize = 500;
private readonly EventStoreConnection _connection;
private readonly IKnowLastProcessedPosition _lastPosition;
private readonly Action<RecordedEvent, Position> _consumer;
private readonly ConcurrentQueue<ResolvedEvent> _pullQueue;
private readonly ConcurrentQueue<ResolvedEvent> _pushQueue;
private Position _lastPullPositionProcessed;
private readonly ManualResetEventSlim _isPulling;
public NotYetDurableAllStreamsSubscriber(EventStoreConnection connection, IKnowLastProcessedPosition lastPosition, Action<RecordedEvent, Position> consumer)
{
_connection = connection;
_lastPosition = lastPosition;
_consumer = consumer;
_pullQueue = new ConcurrentQueue<ResolvedEvent>();
_pushQueue = new ConcurrentQueue<ResolvedEvent>();
_isPulling = new ManualResetEventSlim(true);
_lastPullPositionProcessed = new Position(0, 0);
}
public void Start()
{
Task.Factory.StartNew(() => _connection.SubscribeToAll(true, EnqueuePushedEvent).ContinueWith(t => Log.Info("SUBSCRIBED")))
.ContinueWith(t => ProcessPullQueue())
.ContinueWith(t => ProcessPushQueue());
Task.Factory.StartNew(() => PullAndEnqueueFromPositionExclusive(_lastPosition.GetLastProcessedPosition()));
}
private void ProcessPushQueue()
{
while (true)
{
ResolvedEvent current;
if (_pushQueue.TryDequeue(out current))
{
if (!current.OriginalPosition.HasValue)
throw new NoResultException("Was pushed event without a position");
if (current.OriginalPosition.Value > _lastPullPositionProcessed)
_consumer(current.Event, current.OriginalPosition.Value);
else
Log.Warn("Throwing Duplicate Away: ({0}, {1}) {2} {3}", current.OriginalPosition.Value.CommitPosition, current.OriginalPosition.Value.PreparePosition, current.OriginalStreamId, current.OriginalEvent);
}
else
{
Thread.Sleep(1);
}
}
}
private void EnqueuePushedEvent(ResolvedEvent pulledEvent)
{
_pushQueue.Enqueue(pulledEvent);
}
private void ProcessPullQueue()
{
while (_isPulling.IsSet || _pullQueue.Count > 0)
{
ResolvedEvent current;
if (_pullQueue.TryDequeue(out current))
{
if (!current.OriginalPosition.HasValue)
throw new NoResultException("Pulled event without a position");
_consumer(current.Event, current.OriginalPosition.Value);
_lastPullPositionProcessed = current.OriginalPosition.Value;
}
else
{
Thread.Sleep(1);
}
}
}
private void PullAndEnqueueFromPositionExclusive(Position fromPositionExclusive)
{
AllEventsSlice slice;
var nextPosition = fromPositionExclusive;
do
{
slice = _connection.ReadAllEventsForward(nextPosition, ReadPageSize, true);
nextPosition = slice.NextPosition;
foreach (var e in slice.Events)
{
if (e.OriginalPosition > fromPositionExclusive)
_pullQueue.Enqueue(e);
}
} while (slice.Events.Length != 0);
_isPulling.Reset();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment