Skip to content

Instantly share code, notes, and snippets.

@dcomartin
Created May 11, 2023 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcomartin/fb7c084260f4926bd88640ff47ec7b1d to your computer and use it in GitHub Desktop.
Save dcomartin/fb7c084260f4926bd88640ff47ec7b1d to your computer and use it in GitHub Desktop.
public interface IEventStreamRepository<T>
{
Task<T> Get(string id);
Task Save(T entity);
}
public class WarehouseProductEventStoreStream : IEventStreamRepository<WarehouseProduct>
{
private const int SnapshotInterval = 100;
private readonly IEventStoreConnection _connection;
public static async Task<WarehouseProductEventStoreStream> Factory()
{
var connectionSettings = ConnectionSettings.Create()
.KeepReconnecting()
.KeepRetrying()
.SetHeartbeatTimeout(TimeSpan.FromMinutes(5))
.SetHeartbeatInterval(TimeSpan.FromMinutes(1))
.DisableTls()
.DisableServerCertificateValidation()
.SetDefaultUserCredentials(new UserCredentials("admin", "changeit"))
.Build();
var conn = EventStoreConnection.Create(connectionSettings, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
await conn.ConnectAsync();
return new WarehouseProductEventStoreStream(conn);
}
private WarehouseProductEventStoreStream(IEventStoreConnection connection)
{
_connection = connection;
}
public async Task<WarehouseProduct> Get(string sku)
{
var streamName = GetStreamName(sku);
var snapshot = await GetSnapshot(sku);
var warehouseProduct = new WarehouseProduct(sku, snapshot.State);
StreamEventsSlice currentSlice;
var nextSliceStart = snapshot.Version + 1;
do
{
currentSlice = await _connection.ReadStreamEventsForwardAsync(
streamName,
nextSliceStart,
200,
false
);
nextSliceStart = currentSlice.NextEventNumber;
foreach (var evnt in currentSlice.Events)
{
var eventObj = DeserializeEvent(evnt);
warehouseProduct.ApplyEvent(eventObj);
}
} while (!currentSlice.IsEndOfStream);
return warehouseProduct;
}
public async Task Save(WarehouseProduct warehouseProduct)
{
var streamName = GetStreamName(warehouseProduct.Sku);
var newEvents = warehouseProduct.GetUncommittedEvents();
long version = 0;
foreach (var evnt in newEvents)
{
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(evnt));
var metadata = Encoding.UTF8.GetBytes("{}");
var evt = new EventData(Guid.NewGuid(), evnt.EventType, true, data, metadata);
var result = await _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, evt);
version = result.NextExpectedVersion;
}
if ((version + 1) >= SnapshotInterval && (version + 1) % SnapshotInterval == 0)
{
await AppendSnapshot(warehouseProduct, version);
}
}
private string GetStreamName(string sku)
{
return $"WarehouseProduct-{sku}";
}
private string GetSnapshotStreamName(string sku)
{
return $"WarehouseProduct-Snapshot-{sku}";
}
private async Task<Snapshot> GetSnapshot(string sku)
{
var streamName = GetSnapshotStreamName(sku);
var slice = await _connection.ReadStreamEventsBackwardAsync(streamName, (long)StreamPosition.End, 1, false);
if (slice.Events.Any())
{
var evnt = slice.Events.First();
var json = Encoding.UTF8.GetString(evnt.Event.Data);
return JsonConvert.DeserializeObject<Snapshot>(json);
}
return new Snapshot();
}
private IEvent DeserializeEvent(ResolvedEvent evnt)
{
var json = Encoding.UTF8.GetString(evnt.Event.Data);
return evnt.Event.EventType switch
{
"InventoryAdjusted" => JsonConvert.DeserializeObject<InventoryAdjusted>(json),
"ProductShipped" => JsonConvert.DeserializeObject<ProductShipped>(json),
"ProductReceived" => JsonConvert.DeserializeObject<ProductReceived>(json),
_ => throw new InvalidOperationException($"Unknown Event: {evnt.Event.EventType}")
};
}
private async Task AppendSnapshot(WarehouseProduct warehouseProduct, long version)
{
var streamName = GetSnapshotStreamName(warehouseProduct.Sku);
var state = warehouseProduct.GetState();
var snapshot = new Snapshot
{
State = state,
Version = version
};
var metadata = Encoding.UTF8.GetBytes("{}");
var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(snapshot));
var evt = new EventData(Guid.NewGuid(), "snapshot", true, data, metadata);
await _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, evt);
}
public void Dispose()
{
_connection?.Dispose();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment