-
-
Save dcomartin/fb7c084260f4926bd88640ff47ec7b1d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IEventStreamRepository<T> | |
{ | |
Task<T> Get(string id); | |
Task Save(T entity); | |
} | |
public class WarehouseProductEventStoreStream : IEventStreamRepository<WarehouseProduct> | |
{ | |
private const int SnapshotInterval = 100; | |
private readonly IEventStoreConnection _connection; | |
public static async Task<WarehouseProductEventStoreStream> Factory() | |
{ | |
var connectionSettings = ConnectionSettings.Create() | |
.KeepReconnecting() | |
.KeepRetrying() | |
.SetHeartbeatTimeout(TimeSpan.FromMinutes(5)) | |
.SetHeartbeatInterval(TimeSpan.FromMinutes(1)) | |
.DisableTls() | |
.DisableServerCertificateValidation() | |
.SetDefaultUserCredentials(new UserCredentials("admin", "changeit")) | |
.Build(); | |
var conn = EventStoreConnection.Create(connectionSettings, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
await conn.ConnectAsync(); | |
return new WarehouseProductEventStoreStream(conn); | |
} | |
private WarehouseProductEventStoreStream(IEventStoreConnection connection) | |
{ | |
_connection = connection; | |
} | |
public async Task<WarehouseProduct> Get(string sku) | |
{ | |
var streamName = GetStreamName(sku); | |
var snapshot = await GetSnapshot(sku); | |
var warehouseProduct = new WarehouseProduct(sku, snapshot.State); | |
StreamEventsSlice currentSlice; | |
var nextSliceStart = snapshot.Version + 1; | |
do | |
{ | |
currentSlice = await _connection.ReadStreamEventsForwardAsync( | |
streamName, | |
nextSliceStart, | |
200, | |
false | |
); | |
nextSliceStart = currentSlice.NextEventNumber; | |
foreach (var evnt in currentSlice.Events) | |
{ | |
var eventObj = DeserializeEvent(evnt); | |
warehouseProduct.ApplyEvent(eventObj); | |
} | |
} while (!currentSlice.IsEndOfStream); | |
return warehouseProduct; | |
} | |
public async Task Save(WarehouseProduct warehouseProduct) | |
{ | |
var streamName = GetStreamName(warehouseProduct.Sku); | |
var newEvents = warehouseProduct.GetUncommittedEvents(); | |
long version = 0; | |
foreach (var evnt in newEvents) | |
{ | |
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt)); | |
var metadata = Encoding.UTF8.GetBytes("{}"); | |
var evt = new EventData(Guid.NewGuid(), evnt.EventType, true, data, metadata); | |
var result = await _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, evt); | |
version = result.NextExpectedVersion; | |
} | |
if ((version + 1) >= SnapshotInterval && (version + 1) % SnapshotInterval == 0) | |
{ | |
await AppendSnapshot(warehouseProduct, version); | |
} | |
} | |
private string GetStreamName(string sku) | |
{ | |
return $"WarehouseProduct-{sku}"; | |
} | |
private string GetSnapshotStreamName(string sku) | |
{ | |
return $"WarehouseProduct-Snapshot-{sku}"; | |
} | |
private async Task<Snapshot> GetSnapshot(string sku) | |
{ | |
var streamName = GetSnapshotStreamName(sku); | |
var slice = await _connection.ReadStreamEventsBackwardAsync(streamName, (long)StreamPosition.End, 1, false); | |
if (slice.Events.Any()) | |
{ | |
var evnt = slice.Events.First(); | |
var json = Encoding.UTF8.GetString(evnt.Event.Data); | |
return JsonConvert.DeserializeObject<Snapshot>(json); | |
} | |
return new Snapshot(); | |
} | |
private IEvent DeserializeEvent(ResolvedEvent evnt) | |
{ | |
var json = Encoding.UTF8.GetString(evnt.Event.Data); | |
return evnt.Event.EventType switch | |
{ | |
"InventoryAdjusted" => JsonConvert.DeserializeObject<InventoryAdjusted>(json), | |
"ProductShipped" => JsonConvert.DeserializeObject<ProductShipped>(json), | |
"ProductReceived" => JsonConvert.DeserializeObject<ProductReceived>(json), | |
_ => throw new InvalidOperationException($"Unknown Event: {evnt.Event.EventType}") | |
}; | |
} | |
private async Task AppendSnapshot(WarehouseProduct warehouseProduct, long version) | |
{ | |
var streamName = GetSnapshotStreamName(warehouseProduct.Sku); | |
var state = warehouseProduct.GetState(); | |
var snapshot = new Snapshot | |
{ | |
State = state, | |
Version = version | |
}; | |
var metadata = Encoding.UTF8.GetBytes("{}"); | |
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(snapshot)); | |
var evt = new EventData(Guid.NewGuid(), "snapshot", true, data, metadata); | |
await _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, evt); | |
} | |
public void Dispose() | |
{ | |
_connection?.Dispose(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment