Last active May 6, 2021 10:15
using Azure;
using Azure.Data.Tables;
using Newtonsoft.Json;
using Rebus.Exceptions;
using Rebus.Sagas;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Be.Vlaanderen.Pbs.Runner.Engine.Worker
public class TableStorageSagaStorage : ISagaStorage
private TableClient tableClient;
private const string partitionKey = "Saga";
private const string IdPropertyName = nameof(ISagaData.Id);
public TableStorageSagaStorage(string connectionString, string tableName = "SagaData") : this(new TableClient(connectionString, tableName))
public TableStorageSagaStorage(TableClient tableClient)
this.tableClient = tableClient;
public async Task EnsureCreated() {
await tableClient.CreateIfNotExistsAsync();
public async Task Delete(ISagaData sagaData)
var currentRevision = sagaData.Revision;
var currentData = await tableClient.GetEntityAsync<TableEntity>(partitionKey, sagaData.Id.ToString(), new[] { "Revision", "ETag" });
if (currentData == null)
throw new ConcurrencyException($"Saga data with ID {sagaData.Id} does not exist!");
if (Int32.TryParse(currentData.Value.GetString("Revision"), out var storedRevision) && currentRevision != storedRevision)
throw new ConcurrencyException($"Attempted to update saga data with ID {sagaData.Id} with revision {sagaData.Revision}, but the existing data was updated to revision {currentData.Value.GetInt32("Revision")}");
sagaData.Revision++; // Needed to be compliant with the tests.
//await EnsureCreated();
await tableClient.DeleteEntityAsync(partitionKey, sagaData.Id.ToString());
public async Task<ISagaData> Find(Type sagaDataType, string propertyName, object propertyValue)
await EnsureCreated();
TableEntity entity = default;
if (propertyName.Equals(IdPropertyName, StringComparison.InvariantCultureIgnoreCase))
string sagaId = propertyValue is string
? (string)propertyValue
: ((Guid)propertyValue).ToString();
entity = tableClient.Query<TableEntity>(filter: $"{nameof(TableEntity.RowKey)} eq '{propertyValue?.ToString() ?? ""}'", select: new[] { "SagaData" }).SingleOrDefault();
else {
entity = tableClient.Query<TableEntity>(filter: $"{propertyName} eq '{propertyValue?.ToString() ?? ""}'", select: new[] { "SagaData" }).SingleOrDefault();
if (entity != null && entity.TryGetValue("SagaData", out object data))
var sagaData = JsonConvert.DeserializeObject(data.ToString(), sagaDataType, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All });
return sagaData as ISagaData;
return null;
public async Task Insert(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
await EnsureCreated();
if (sagaData.Id == Guid.Empty)
throw new InvalidOperationException($"Saga data {sagaData.GetType()} has an uninitialized Id property!");
if (sagaData.Revision != 0)
throw new InvalidOperationException($"Attempted to insert saga data with ID {sagaData.Id} and revision {sagaData.Revision}, but revision must be 0 on first insert!");
/*var filter = correlationProperties
.Select(p => p.PropertyName)
.Select(path =>
var value = GetPropertyValue(sagaData, path);
if (value == null)
return null;
return $"{path} eq '{value.ToString()}'";
//return new KeyValuePair<string, string>(path, value != null ? value.ToString() : null);
new[] { $"SagaId eq '{sagaData.Id}'" }
var exists = tableClient.Query<TableEntity>(filter: string.Join(" or ", filter)).Any();
if (exists) {
throw new ConcurrencyException($"Saga data already exists!");
var entity = ToTableEntity(sagaData, correlationProperties);
await tableClient.AddEntityAsync(entity);
public async Task Update(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
await EnsureCreated();
var currentRevision = sagaData.Revision;
var currentData = await tableClient.GetEntityAsync<TableEntity>(partitionKey, sagaData.Id.ToString(), new[] { "Revision", "ETag" });
if (currentData == null)
throw new ConcurrencyException($"Saga data with ID {sagaData.Id} does not exist!");
if (Int32.TryParse(currentData.Value.GetString("Revision"), out var storedRevision) && currentRevision != storedRevision)
throw new ConcurrencyException($"Attempted to update saga data with ID {sagaData.Id} with revision {sagaData.Revision}, but the existing data was updated to revision {storedRevision}");
// Increment Revision
var entity = ToTableEntity(sagaData, correlationProperties);
await tableClient.UpdateEntityAsync(entity, currentData.Value.ETag, TableUpdateMode.Replace);
catch {
private static TableEntity ToTableEntity(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties, string partitionKey = partitionKey) {
var entity = new TableEntity(partitionKey, sagaData.Id.ToString());
var indexedProperties = GetPropertiesToIndex(sagaData, correlationProperties);
entity.Add("SagaData", JsonConvert.SerializeObject(sagaData, new JsonSerializerSettings() { TypeNameHandling = TypeNameHandling.All }));
foreach (var indexedProperty in indexedProperties)
entity.Add(indexedProperty.Key, indexedProperty.Value);
return entity;
public static object GetPropertyValue(object obj, string path)
var dots = path.Split('.');
foreach (var dot in dots)
var propertyInfo = obj.GetType().GetProperty(dot);
if (propertyInfo == null) return null;
obj = propertyInfo.GetValue(obj, new object[0]);
if (obj == null) break;
return obj;
private static IEnumerable<KeyValuePair<string, string>> GetPropertiesToIndex(ISagaData sagaData, IEnumerable<ISagaCorrelationProperty> correlationProperties)
return correlationProperties
.Select(p => p.PropertyName)
.Select(path =>
var value = GetPropertyValue(sagaData, path);
return new KeyValuePair<string, string>(path, value != null ? value.ToString() : null);
.Where(kvp => kvp.Value != null)
.Union(new [] {
//new KeyValuePair<string, string>("Id", sagaData.Id.ToString()),
new KeyValuePair<string, string>("SagaId", sagaData.Id.ToString()),
new KeyValuePair<string, string>("Revision", sagaData.Revision.ToString())
