|
using System; |
|
using System.Collections.Generic; |
|
using System.Data.SqlClient; |
|
using System.Diagnostics; |
|
using System.Linq; |
|
using System.Threading; |
|
using System.Threading.Tasks; |
|
|
|
using Cedar.EventStore; |
|
using Cedar.EventStore.Streams; |
|
using Cedar.EventStore.Subscriptions; |
|
|
|
using JG.DonationCollection.Application.CommandProcessing; |
|
using JG.DonationCollection.Application.Infrastructure; |
|
using JG.DonationCollection.Application.Infrastructure.Logging; |
|
using JG.DonationCollection.Application.Ports; |
|
|
|
using Polly; |
|
|
|
namespace JG.DonationCollection.Persistence.EventStorage |
|
{ |
|
public class CedarEventStorage : IEventStorage |
|
{ |
|
private static readonly ContextualPolicy DatabaseAccessPolicy; |
|
|
|
private readonly string _connectionString; |
|
|
|
private readonly IJsonMessageSerialiser _serialiser; |
|
|
|
private readonly IPerformanceMonitor _performanceMonitor; |
|
|
|
static CedarEventStorage() |
|
{ |
|
DatabaseAccessPolicy = |
|
Policy |
|
.Handle<SqlException>( |
|
x => |
|
x.Number == -2 // timeout |
|
|| x.ErrorCode == 11 // general network error |
|
) |
|
.WaitAndRetry( |
|
3, |
|
i => TimeSpan.FromMilliseconds(i * 25), |
|
(e, t, c) => { Log.ThatException("Failed connecting to event storage", e); }); |
|
} |
|
|
|
public CedarEventStorage(string connectionString, IJsonMessageSerialiser serialiser, IPerformanceMonitor performanceMonitor) |
|
{ |
|
_connectionString = connectionString; |
|
_serialiser = serialiser; |
|
_performanceMonitor = performanceMonitor; |
|
} |
|
|
|
public async Task<Maybe<IEnumerable<IMessageEnvelope<IMessage>>>> TryGetAsync(string streamName) |
|
{ |
|
try |
|
{ |
|
var result = Maybe.Of(await GetAsync(streamName)); |
|
|
|
return result; |
|
} |
|
catch (StreamNotFoundException) |
|
{ |
|
return Maybe<IEnumerable<IMessageEnvelope<IMessage>>>.Empty; |
|
} |
|
} |
|
|
|
public async Task<IEnumerable<IMessageEnvelope<IMessage>>> GetAsync(string streamName) |
|
{ |
|
return await DatabaseAccessPolicy.Execute( |
|
async () => |
|
{ |
|
var start = 0; |
|
const int BatchSize = 500; |
|
|
|
var timer = Stopwatch.StartNew(); |
|
|
|
using ( |
|
var eventStore = new MsSqlEventStore(_connectionString, Poller.CreateEventStoreNotifier())) |
|
{ |
|
StreamEventsPage eventsPage; |
|
var events = new List<IMessageEnvelope<IMessage>>(); |
|
|
|
do |
|
{ |
|
eventsPage = await eventStore.ReadStreamForwards(streamName, start, BatchSize); |
|
|
|
// TODO: throw DC exception |
|
if (eventsPage.Status == PageReadStatus.StreamDeleted) |
|
{ |
|
throw new Exception("Stream deleted"); |
|
} |
|
|
|
if (eventsPage.Status == PageReadStatus.StreamNotFound) |
|
{ |
|
throw new StreamNotFoundException(streamName); |
|
} |
|
|
|
events.AddRange( |
|
eventsPage.Events.Select( |
|
@event => |
|
_serialiser.DeserializeEvent(@event.Type, @event.JsonMetadata, @event.JsonData))); |
|
|
|
start = eventsPage.NextStreamVersion; |
|
} |
|
while (!eventsPage.IsEndOfStream); |
|
|
|
_performanceMonitor.NotifyTimer(PerformanceCounters.CedarEventStore.GetAsyncTimer, timer.Elapsed); |
|
_performanceMonitor.NotifyCounter(PerformanceCounters.CedarEventStore.EventsRead, events.Count); |
|
_performanceMonitor.NotifyGauge(PerformanceCounters.CedarEventStore.StreamSize, events.Count); |
|
|
|
return events; |
|
} |
|
}); |
|
} |
|
|
|
public async Task AppendAsync(string streamName, int expectedVersion, IEnumerable<IMessageEnvelope<IMessage>> events) |
|
{ |
|
await DatabaseAccessPolicy.Execute( |
|
async () => |
|
{ |
|
var timer = Stopwatch.StartNew(); |
|
|
|
using (var es = new MsSqlEventStore(_connectionString, Poller.CreateEventStoreNotifier())) |
|
{ |
|
var eventToStore = |
|
events.Select( |
|
e => |
|
new NewStreamEvent( |
|
CombGuid.NewGuid(), |
|
e.MetaData.MessageType, |
|
_serialiser.Serialize(e.Message), |
|
_serialiser.Serialize(e.MetaData))); |
|
|
|
var newStreamEvents = eventToStore.ToArray(); |
|
|
|
if (newStreamEvents.Length == 0) |
|
{ |
|
return; |
|
} |
|
|
|
await |
|
es.AppendToStream( |
|
streamName, |
|
expectedVersion < 0 ? expectedVersion : expectedVersion - 1, |
|
newStreamEvents, |
|
CancellationToken.None); |
|
|
|
_performanceMonitor.NotifyTimer(PerformanceCounters.CedarEventStore.AppendAsyncTimer, timer.Elapsed); |
|
} |
|
}); |
|
} |
|
} |
|
} |