Skip to content

Instantly share code, notes, and snippets.

@bwaterschoot
Created March 15, 2016 12:10
Show Gist options
  • Save bwaterschoot/77362620bbf058b58285 to your computer and use it in GitHub Desktop.
Save bwaterschoot/77362620bbf058b58285 to your computer and use it in GitHub Desktop.
Reading back a large number of events in batches

We wanted to test reading back events in batches, so we initialise a stream to contain 7500 events. When re-reading the events however it doesn't always return the same number of events. We think it's because the ordinal and streamversion aren't in sync, by which we mean that the highest ordinal for a stream doesn't contain the highest streamversion.

Unless there's something stupid we're doing / forgetting ?

In SQL: http://postimg.org/image/7m87v83l9/

/*SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;*/
BEGIN TRANSACTION AppendStream;
DECLARE @streamIdInternal AS INT;
DECLARE @latestStreamVersion AS INT;
SELECT @streamIdInternal = dbo.Streams.IdInternal
FROM dbo.Streams
WHERE dbo.Streams.Id = @streamId;
IF @streamIdInternal IS NULL
BEGIN
INSERT INTO dbo.Streams (Id, IdOriginal) VALUES (@streamId, @streamIdOriginal);
SELECT @streamIdInternal = SCOPE_IDENTITY();
INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal,
StreamVersion,
Id,
Created,
[Type],
JsonData,
JsonMetadata
FROM @newEvents;
END
ELSE
BEGIN
SELECT TOP(1)
@latestStreamVersion = dbo.Events.StreamVersion
FROM dbo.Events
WHERE dbo.Events.StreamIDInternal = @streamIdInternal
ORDER BY dbo.Events.Ordinal DESC;
INSERT INTO dbo.Events (StreamIdInternal, StreamVersion, Id, Created, [Type], JsonData, JsonMetadata)
SELECT @streamIdInternal,
StreamVersion + @latestStreamVersion + 1,
Id,
Created,
[Type],
JsonData,
JsonMetadata
FROM @newEvents;
END
COMMIT TRANSACTION AppendStream;
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Cedar.EventStore;
using Cedar.EventStore.Streams;
using Cedar.EventStore.Subscriptions;
using JG.DonationCollection.Application.CommandProcessing;
using JG.DonationCollection.Application.Infrastructure;
using JG.DonationCollection.Application.Infrastructure.Logging;
using JG.DonationCollection.Application.Ports;
using Polly;
namespace JG.DonationCollection.Persistence.EventStorage
{
public class CedarEventStorage : IEventStorage
{
private static readonly ContextualPolicy DatabaseAccessPolicy;
private readonly string _connectionString;
private readonly IJsonMessageSerialiser _serialiser;
private readonly IPerformanceMonitor _performanceMonitor;
static CedarEventStorage()
{
DatabaseAccessPolicy =
Policy
.Handle<SqlException>(
x =>
x.Number == -2 // timeout
|| x.ErrorCode == 11 // general network error
)
.WaitAndRetry(
3,
i => TimeSpan.FromMilliseconds(i * 25),
(e, t, c) => { Log.ThatException("Failed connecting to event storage", e); });
}
public CedarEventStorage(string connectionString, IJsonMessageSerialiser serialiser, IPerformanceMonitor performanceMonitor)
{
_connectionString = connectionString;
_serialiser = serialiser;
_performanceMonitor = performanceMonitor;
}
public async Task<Maybe<IEnumerable<IMessageEnvelope<IMessage>>>> TryGetAsync(string streamName)
{
try
{
var result = Maybe.Of(await GetAsync(streamName));
return result;
}
catch (StreamNotFoundException)
{
return Maybe<IEnumerable<IMessageEnvelope<IMessage>>>.Empty;
}
}
public async Task<IEnumerable<IMessageEnvelope<IMessage>>> GetAsync(string streamName)
{
return await DatabaseAccessPolicy.Execute(
async () =>
{
var start = 0;
const int BatchSize = 500;
var timer = Stopwatch.StartNew();
using (
var eventStore = new MsSqlEventStore(_connectionString, Poller.CreateEventStoreNotifier()))
{
StreamEventsPage eventsPage;
var events = new List<IMessageEnvelope<IMessage>>();
do
{
eventsPage = await eventStore.ReadStreamForwards(streamName, start, BatchSize);
// TODO: throw DC exception
if (eventsPage.Status == PageReadStatus.StreamDeleted)
{
throw new Exception("Stream deleted");
}
if (eventsPage.Status == PageReadStatus.StreamNotFound)
{
throw new StreamNotFoundException(streamName);
}
events.AddRange(
eventsPage.Events.Select(
@event =>
_serialiser.DeserializeEvent(@event.Type, @event.JsonMetadata, @event.JsonData)));
start = eventsPage.NextStreamVersion;
}
while (!eventsPage.IsEndOfStream);
_performanceMonitor.NotifyTimer(PerformanceCounters.CedarEventStore.GetAsyncTimer, timer.Elapsed);
_performanceMonitor.NotifyCounter(PerformanceCounters.CedarEventStore.EventsRead, events.Count);
_performanceMonitor.NotifyGauge(PerformanceCounters.CedarEventStore.StreamSize, events.Count);
return events;
}
});
}
public async Task AppendAsync(string streamName, int expectedVersion, IEnumerable<IMessageEnvelope<IMessage>> events)
{
await DatabaseAccessPolicy.Execute(
async () =>
{
var timer = Stopwatch.StartNew();
using (var es = new MsSqlEventStore(_connectionString, Poller.CreateEventStoreNotifier()))
{
var eventToStore =
events.Select(
e =>
new NewStreamEvent(
CombGuid.NewGuid(),
e.MetaData.MessageType,
_serialiser.Serialize(e.Message),
_serialiser.Serialize(e.MetaData)));
var newStreamEvents = eventToStore.ToArray();
if (newStreamEvents.Length == 0)
{
return;
}
await
es.AppendToStream(
streamName,
expectedVersion < 0 ? expectedVersion : expectedVersion - 1,
newStreamEvents,
CancellationToken.None);
_performanceMonitor.NotifyTimer(PerformanceCounters.CedarEventStore.AppendAsyncTimer, timer.Elapsed);
}
});
}
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Cedar.EventStore.Streams;
using JG.DonationCollection.Application.CommandProcessing;
using JG.DonationCollection.Application.CommandProcessing.Domain.Events;
using JG.DonationCollection.Application.Infrastructure;
using JG.DonationCollection.Application.Ports;
using JG.DonationCollection.Persistence.EventStorage;
using Moq;
using NUnit.Framework;
namespace JG.DonationCollection.Persistence.Tests.EventStorageTests
{
public class Given_an_initialised_event_store_with_a_large_stream
{
private LocalMsSqlStore _db;
private CedarEventStorage _eventStorage;
private List<IMessageEnvelope<IMessage>> _eventsToWrite;
[SetUp]
public async Task When_events_are_saved()
{
await _eventStorage.AppendAsync("my-stream", ExpectedVersion.Any, _eventsToWrite);
}
[Test]
public async Task Then_the_events_can_be_read_back()
{
var readEvents = await _eventStorage.GetAsync("my-stream");
EventStreamAssert.AssertEventsAreSame(readEvents, _eventsToWrite);
}
[OneTimeSetUp]
public async Task Initialise_Event_Storage()
{
_db = new LocalMsSqlStore();
await _db.InitialiseStoreAsync(Scripts.InitializeStore);
_eventStorage = new CedarEventStorage(
_db.ConnectionString,
new JsonMessageSerialiser(new[] { typeof(MyEvent) }),
Mock.Of<IPerformanceMonitor>());
_eventsToWrite = new List<IMessageEnvelope<IMessage>>();
var largeStreamCount = 7500;
for (int i = 0; i < largeStreamCount; i++)
{
var envelope = MessageEnvelope.For(
new MyEvent("Hello"),
new MetaData
{
MessageType = MessageContract.GetContractName(typeof(DonationMessageRemoved)),
CorrelationId = CombGuid.NewGuid().ToString("N"),
ApplicationName = "My Application",
ApplicationVersion = "6.0.0.1",
CausationId = CombGuid.NewGuid().ToString("N"),
MachineId = "My Machine",
MessageId = CombGuid.NewGuid().ToString("N"),
SecurityContext = new[] { new ClaimAssertion("MyClaim", "IsThis") },
Timestamp = DateTimeOffset.UtcNow
});
_eventsToWrite.Add(envelope);
}
}
}
}
/* SQL Server 2008+ */
DECLARE @streamIdInternal AS INT
DECLARE @isDeleted AS BIT
SELECT @streamIdInternal = dbo.Streams.IdInternal,
@isDeleted = dbo.Streams.IsDeleted
FROM dbo.Streams
WHERE dbo.Streams.Id = @streamId
SELECT @isDeleted;
SELECT TOP(@count)
dbo.Events.StreamVersion,
dbo.Events.Ordinal,
dbo.Events.Id AS EventId,
dbo.Events.Created,
dbo.Events.Type,
dbo.Events.JsonData,
dbo.Events.JsonMetadata
FROM dbo.Events
INNER JOIN dbo.Streams
ON dbo.Events.StreamIdInternal = dbo.Streams.IdInternal
WHERE dbo.Events.StreamIDInternal = @streamIDInternal AND dbo.Events.StreamVersion >= @StreamVersion
ORDER BY dbo.Events.Ordinal; //IF WE CHANGE THIS TO ORDER BY StreamVersion IT DOES WORK
SELECT TOP(1)
dbo.Events.StreamVersion
FROM dbo.Events
WHERE dbo.Events.StreamIDInternal = @streamIDInternal
ORDER BY dbo.Events.Ordinal DESC; //IF WE CHANGE THIS TO ORDER BY StreamVersion IT DOES WORK
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment