Skip to content

Instantly share code, notes, and snippets.

@JontyMC
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JontyMC/18387ef24db1a1b3495e to your computer and use it in GitHub Desktop.
Save JontyMC/18387ef24db1a1b3495e to your computer and use it in GitHub Desktop.
Batched eventstore dispatcher
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using EventStore.ClientAPI;
public static class EventStoreConnectionExentsions
{
public static EventStoreStreamCatchUpSubscription SubscribeToStreamBatchedFrom(this IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<EventStoreCatchUpSubscription, ResolvedEvent[]> eventsAppeared, int batchSize = 500)
{
var dispatcher = new BatchedEventSubscriber(connection, streamId, lastProcessedVersion, resolveLinks, eventsAppeared, batchSize);
return dispatcher.Subscribe();
}
class BatchedEventSubscriber
{
readonly BlockingCollection<ResolvedEvent> eventQueue;
readonly IEventStoreConnection connection;
readonly string streamId;
readonly int? lastProcessedVersion;
readonly bool resolveLinks;
readonly Action<EventStoreCatchUpSubscription, ResolvedEvent[]> batchAppeared;
int batchExecuting;
public BatchedEventSubscriber(IEventStoreConnection connection, string streamId, int? lastProcessedVersion, bool resolveLinks, Action<EventStoreCatchUpSubscription, ResolvedEvent[]> batchAppeared, int batchSize = 500)
{
this.connection = connection;
this.streamId = streamId;
this.lastProcessedVersion = lastProcessedVersion;
this.resolveLinks = resolveLinks;
this.batchAppeared = batchAppeared;
eventQueue = new BlockingCollection<ResolvedEvent>(batchSize);
}
public EventStoreStreamCatchUpSubscription Subscribe()
{
return connection.SubscribeToStreamFrom(
streamId,
lastProcessedVersion,
resolveLinks,
EventAppeared
);
}
void EventAppeared(EventStoreCatchUpSubscription subscription, ResolvedEvent resolvedEvent)
{
eventQueue.Add(resolvedEvent);
if (Interlocked.CompareExchange(ref batchExecuting, 1, 0) == 0)
ThreadPool.QueueUserWorkItem(delegate { ExecuteBatch(subscription); });
}
void ExecuteBatch(EventStoreCatchUpSubscription subscription)
{
do
{
var events = new List<ResolvedEvent>();
ResolvedEvent @event;
while (eventQueue.TryTake(out @event))
{
events.Add(@event);
}
batchAppeared(subscription, events.ToArray());
Interlocked.Exchange(ref batchExecuting, 0);
} while (eventQueue.Count > 0 && Interlocked.CompareExchange(ref batchExecuting, 1, 0) == 0);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment