Last active
August 29, 2015 14:07
-
-
Save JontyMC/18387ef24db1a1b3495e to your computer and use it in GitHub Desktop.
Batched eventstore dispatcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
using System.Threading; | |
using EventStore.ClientAPI; | |
public static class EventStoreConnectionExentsions | |
{ | |
public static EventStoreStreamCatchUpSubscription SubscribeToStreamBatchedFrom(this IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<EventStoreCatchUpSubscription, ResolvedEvent[]> eventsAppeared, int batchSize = 500) | |
{ | |
var dispatcher = new BatchedEventSubscriber(connection, streamId, lastProcessedVersion, resolveLinks, eventsAppeared, batchSize); | |
return dispatcher.Subscribe(); | |
} | |
class BatchedEventSubscriber | |
{ | |
readonly BlockingCollection<ResolvedEvent> eventQueue; | |
readonly IEventStoreConnection connection; | |
readonly string streamId; | |
readonly int? lastProcessedVersion; | |
readonly bool resolveLinks; | |
readonly Action<EventStoreCatchUpSubscription, ResolvedEvent[]> batchAppeared; | |
int batchExecuting; | |
public BatchedEventSubscriber(IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<EventStoreCatchUpSubscription, ResolvedEvent[]> batchAppeared, int batchSize = 500) | |
{ | |
this.connection = connection; | |
this.streamId = streamId; | |
this.lastProcessedVersion = lastProcessedVersion; | |
this.resolveLinks = resolveLinks; | |
this.batchAppeared = batchAppeared; | |
eventQueue = new BlockingCollection<ResolvedEvent>(batchSize); | |
} | |
public EventStoreStreamCatchUpSubscription Subscribe() | |
{ | |
return connection.SubscribeToStreamFrom( | |
streamId, | |
lastProcessedVersion, | |
resolveLinks, | |
EventAppeared | |
); | |
} | |
void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent) | |
{ | |
eventQueue.Add(resolvedEvent); | |
if (Interlocked.CompareExchange(ref batchExecuting, 1, 0) == 0) | |
ThreadPool.QueueUserWorkItem(delegate { ExecuteBatch(subscription); }); | |
} | |
void ExecuteBatch(EventStoreCatchUpSubscription subscription) | |
{ | |
do | |
{ | |
var events = new List<ResolvedEvent>(); | |
ResolvedEvent @event; | |
while (eventQueue.TryTake(out @event)) | |
{ | |
events.Add(@event); | |
} | |
batchAppeared(subscription, events.ToArray()); | |
Interlocked.Exchange(ref batchExecuting, 0); | |
} while (eventQueue.Count > 0 && Interlocked.CompareExchange(ref batchExecuting, 1, 0) == 0); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment