Skip to content

Instantly share code, notes, and snippets.

@kristofdegrave
Last active May 6, 2021 10:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kristofdegrave/f254b8e97529e4d0b532be444838eacb to your computer and use it in GitHub Desktop.
Save kristofdegrave/f254b8e97529e4d0b532be444838eacb to your computer and use it in GitHub Desktop.
TableStorageSagaStorage
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using Rebus.Exceptions;
using Rebus.Sagas;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Be.Vlaanderen.Pbs.Runner.Engine.Worker
{
public class TableStorageSagaStorage : ISagaStorage
{
private TableClient tableClient;
private const string partitionKey = "Saga";
private const string IdPropertyName = nameof(ISagaData.Id);
public TableStorageSagaStorage(string connectionString, string tableName = "SagaData") : this(new TableClient(connectionString, tableName))
{
}
public TableStorageSagaStorage(TableClient tableClient)
{
this.tableClient = tableClient;
}
public async Task EnsureCreated() {
await tableClient.CreateIfNotExistsAsync();
}
public async Task Delete(ISagaData sagaData)
{
var currentRevision = sagaData.Revision;
var currentData = await tableClient.GetEntityAsync<TableEntity>(partitionKey, sagaData.Id.ToString(), new[] { "Revision", "ETag" });
if (currentData == null)
{
throw new ConcurrencyException($"Saga data with ID {sagaData.Id} does not exist!");
}
if (Int32.TryParse(currentData.Value.GetString("Revision"), out var storedRevision) && currentRevision != storedRevision)
{
throw new ConcurrencyException($"Attempted to update saga data with ID {sagaData.Id} with revision {sagaData.Revision}, but the existing data was updated to revision {currentData.Value.GetInt32("Revision")}");
}
sagaData.Revision++; // Needed to be compliant with the tests.
//await EnsureCreated();
await tableClient.DeleteEntityAsync(partitionKey, sagaData.Id.ToString());
}
public async Task<ISagaData> Find(Type sagaDataType, string propertyName, object propertyValue)
{
await EnsureCreated();
TableEntity entity = default;
if (propertyName.Equals(IdPropertyName, StringComparison.InvariantCultureIgnoreCase))
{
string sagaId = propertyValue is string
? (string)propertyValue
: ((Guid)propertyValue).ToString();
entity = tableClient.Query<TableEntity>(filter: $"{nameof(TableEntity.RowKey)} eq '{propertyValue?.ToString() ?? ""}'", select: new[] { "SagaData" }).SingleOrDefault();
}
else {
entity = tableClient.Query<TableEntity>(filter: $"{propertyName} eq '{propertyValue?.ToString() ?? ""}'", select: new[] { "SagaData" }).SingleOrDefault();
}
if (entity != null && entity.TryGetValue("SagaData", out object data))
{
try
{
var sagaData = JsonConvert.DeserializeObject(data.ToString(), sagaDataType, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All });
return sagaData as ISagaData;
}
catch
{
}
}
return null;
}
public async Task Insert(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
await EnsureCreated();
if (sagaData.Id == Guid.Empty)
{
throw new InvalidOperationException($"Saga data {sagaData.GetType()} has an uninitialized Id property!");
}
if (sagaData.Revision != 0)
{
throw new InvalidOperationException($"Attempted to insert saga data with ID {sagaData.Id} and revision {sagaData.Revision}, but revision must be 0 on first insert!");
}
/*var filter = correlationProperties
.Select(p => p.PropertyName)
.Select(path =>
{
var value = GetPropertyValue(sagaData, path);
if (value == null)
{
return null;
}
return $"{path} eq '{value.ToString()}'";
//return new KeyValuePair<string, string>(path, value != null ? value.ToString() : null);
}).Union(
new[] { $"SagaId eq '{sagaData.Id}'" }
);
var exists = tableClient.Query<TableEntity>(filter: string.Join(" or ", filter)).Any();
if (exists) {
throw new ConcurrencyException($"Saga data already exists!");
}*/
var entity = ToTableEntity(sagaData, correlationProperties);
await tableClient.AddEntityAsync(entity);
}
public async Task Update(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
try
{
await EnsureCreated();
var currentRevision = sagaData.Revision;
var currentData = await tableClient.GetEntityAsync<TableEntity>(partitionKey, sagaData.Id.ToString(), new[] { "Revision", "ETag" });
if (currentData == null)
{
throw new ConcurrencyException($"Saga data with ID {sagaData.Id} does not exist!");
}
if (Int32.TryParse(currentData.Value.GetString("Revision"), out var storedRevision) && currentRevision != storedRevision)
{
throw new ConcurrencyException($"Attempted to update saga data with ID {sagaData.Id} with revision {sagaData.Revision}, but the existing data was updated to revision {storedRevision}");
}
// Increment Revision
sagaData.Revision++;
var entity = ToTableEntity(sagaData, correlationProperties);
await tableClient.UpdateEntityAsync(entity, currentData.Value.ETag, TableUpdateMode.Replace);
}
catch {
throw;
}
}
private static TableEntity ToTableEntity(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties, string partitionKey = partitionKey) {
var entity = new TableEntity(partitionKey, sagaData.Id.ToString());
var indexedProperties = GetPropertiesToIndex(sagaData, correlationProperties);
entity.Add("SagaData", JsonConvert.SerializeObject(sagaData, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All }));
foreach (var indexedProperty in indexedProperties)
{
entity.Add(indexedProperty.Key, indexedProperty.Value);
}
return entity;
}
public static object GetPropertyValue(object obj, string path)
{
var dots = path.Split('.');
foreach (var dot in dots)
{
var propertyInfo = obj.GetType().GetProperty(dot);
if (propertyInfo == null) return null;
obj = propertyInfo.GetValue(obj, new object[0]);
if (obj == null) break;
}
return obj;
}
private static IEnumerable<KeyValuePair<string, string>> GetPropertiesToIndex(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
{
return correlationProperties
.Select(p => p.PropertyName)
.Select(path =>
{
var value = GetPropertyValue(sagaData, path);
return new KeyValuePair<string, string>(path, value != null ? value.ToString() : null);
})
.Where(kvp => kvp.Value != null)
.Union(new [] {
//new KeyValuePair<string, string>("Id", sagaData.Id.ToString()),
new KeyValuePair<string, string>("SagaId", sagaData.Id.ToString()),
new KeyValuePair<string, string>("Revision", sagaData.Revision.ToString())
})
.ToList();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment