Skip to content

Instantly share code, notes, and snippets.

@rcknight
Last active January 10, 2023 21:41
Show Gist options
  • Save rcknight/6c850894c9c199e2f6e1bc30e079e1cb to your computer and use it in GitHub Desktop.
Save rcknight/6c850894c9c199e2f6e1bc30e079e1cb to your computer and use it in GitHub Desktop.
Event Store Checkpoint Weirdness
using System.Text;
using EventStore.Client;
var settings = EventStoreClientSettings.Create("esdb+discover://localhost:2113?tls=false");
var esClient = new EventStoreClient(settings);
// Give us something to catch up
await WriteEvents(1000);
ulong lastEventPosition = 0;
await CreateSubscription();
await WriteEvents(500, 1000);
async Task CreateSubscription()
{
var filter = new SubscriptionFilterOptions(
EventTypeFilter.ExcludeSystemEvents(),
checkpointReached: (s, p, c) =>
{
if (lastEventPosition > p.CommitPosition)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.Write("[OUTDATED?] ");
}
Console.WriteLine($"Checkpoint reached at {p.CommitPosition}");
Console.ResetColor();
return Task.CompletedTask;
});
await esClient.SubscribeToAllAsync(FromAll.Start,
(s, e, c) =>
{
Console.WriteLine(
$"{e.Event.EventType} @ {e.Event.Position.CommitPosition}");
lastEventPosition = e.Event.Position.CommitPosition;
return Task.CompletedTask;
},
filterOptions: filter);
}
async Task WriteEvents(int numEvents, int msDelay = 0)
{
for (var i = 0; i < numEvents; i++)
{
await esClient.AppendToStreamAsync(Uuid.NewUuid().ToString(), StreamState.Any,
new[]
{
new EventData(Uuid.NewUuid(), "TestEvent", Encoding.UTF8.GetBytes($"{i}"), Encoding.UTF8.GetBytes($"{i}"))
});
Console.WriteLine($"Written Event {i}");
await Task.Delay(msDelay);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment