Skip to content

Instantly share code, notes, and snippets.

@ekepes
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ekepes/6940e5f70cbac25203ae to your computer and use it in GitHub Desktop.
Save ekepes/6940e5f70cbac25203ae to your computer and use it in GitHub Desktop.
using Automatonymous;
using Dapper;
using MassTransit.Exceptions;
using MassTransit.Logging;
using MassTransit.Pipeline;
using MassTransit.Saga;
using MassTransit.Util;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Transactions;
namespace MassTransit.DapperIntegration
{
/// <summary>
/// A saga repository implementation using Dapper and JSON (de)serialization.
/// All sagas are stored in a single table, e.g.:
/// <code>
/// CREATE TABLE Sagas
/// (
/// correlation_id UNIQUEIDENTIFIER NOT NULL PRIMARY KEY NONCLUSTERED,
/// saga_instance_type_name NVARCHAR(200) NOT NULL,
/// saga_instance_json NVARCHAR(MAX) NOT NULL,
/// )
/// </code>
/// The state machine instance is serialized and stored in the saga_instance_json
/// column when saving the saga, and deserialized when loading the saga.
/// </summary>
/// <typeparam name="TSaga"></typeparam>
public class DapperSagaRepository<TSaga> : ISagaRepository<TSaga>
where TSaga : class, ISaga
{
private readonly ILog _log = Logger.Get<DapperSagaRepository<TSaga>>();
private readonly string _connectionString;
private readonly JsonSerializerSettings _jsonSettings;
private readonly string _sqlTableName;
public DapperSagaRepository(string connectionString, string sqlTableName, StateMachine<TSaga> stateMachine)
{
_connectionString = connectionString;
_sqlTableName = sqlTableName;
// Configure JSON serialization.
_jsonSettings = new JsonSerializerSettings();
_jsonSettings.Converters.Add(new SagaStateJsonConverter<TSaga>(stateMachine));
_jsonSettings.Converters.Add(new SagaServiceBusConverter());
}
/// <summary>
/// Loads/Creates the saga and makes it available for later consumption through the Actions
/// </summary>
public IEnumerable<Action<IConsumeContext<TMessage>>> GetSaga<TMessage>(
IConsumeContext<TMessage> context,
Guid sagaId,
InstanceHandlerSelector<TSaga, TMessage> selector,
ISagaPolicy<TSaga, TMessage> policy)
where TMessage : class
{
using (var txScope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.Serializable }))
{
var instance = GetSaga(sagaId);
if (instance == null)
{
if (policy.CanCreateInstance(context))
{
yield return x =>
{
_log.DebugFormat("SAGA: {0} Creating New {1} for {2}", typeof(TSaga).ToFriendlyName(),
sagaId, typeof(TMessage).ToFriendlyName());
try
{
instance = policy.CreateInstance(x, sagaId);
foreach (var callback in selector(instance, x))
callback(x);
if (!policy.CanRemoveInstance(instance))
SaveSaga(instance);
}
catch (Exception ex)
{
var sex = new SagaException("Create Saga Instance Exception", typeof(TSaga),
typeof(TMessage), sagaId, ex);
if (_log.IsErrorEnabled)
_log.Error("Saga exception!", sex);
throw sex;
}
};
}
else
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Ignoring Missing {1} for {2}", typeof(TSaga).ToFriendlyName(),
sagaId, typeof(TMessage).ToFriendlyName());
}
}
else
{
if (policy.CanUseExistingInstance(context))
{
yield return x =>
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Using Existing {1} for {2}",
typeof(TSaga).ToFriendlyName(), sagaId,
typeof(TMessage).ToFriendlyName());
try
{
foreach (var callback in selector(instance, x))
callback(x);
if (policy.CanRemoveInstance(instance))
DeleteSaga(instance);
else // TODO: Is this the correct place to persist the saga again?
SaveSaga(instance);
}
catch (Exception ex)
{
var sex = new SagaException("Existing Saga Instance Exception", typeof(TSaga),
typeof(TMessage), sagaId, ex);
if (_log.IsErrorEnabled)
_log.Error(sex);
throw sex;
}
};
}
else
{
if (_log.IsDebugEnabled)
_log.DebugFormat("SAGA: {0} Ignoring Existing {1} for {2}", typeof(TSaga).ToFriendlyName(),
sagaId,
typeof(TMessage).ToFriendlyName());
}
}
txScope.Complete();
}
}
private TSaga GetSaga(Guid sagaCorrelationId)
{
var sql = string.Format("SELECT correlation_id, saga_instance_json FROM {0} WHERE correlation_id = @correlation_id", _sqlTableName);
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
var result = connection.Query<dynamic>(sql, new { correlation_id = sagaCorrelationId }).SingleOrDefault();
if (result == null)
return null;
return JsonConvert.DeserializeObject<TSaga>(result.saga_instance_json, _jsonSettings);
}
}
/// <summary>
/// Saves or updates a saga.
/// </summary>
/// <param name="saga"></param>
private void SaveSaga(TSaga saga)
{
var sql = string.Format(@"
MERGE INTO {0} src
USING (
VALUES (@correlation_id, @saga_instance_type_name, @saga_instance_json)
) tgt (correlation_id, saga_instance_type_name, saga_instance_json)
ON src.correlation_id = tgt.correlation_id
WHEN MATCHED THEN
UPDATE SET saga_instance_json = tgt.saga_instance_json
WHEN NOT MATCHED THEN
INSERT (correlation_id, saga_instance_type_name, saga_instance_json)
VALUES (tgt.correlation_id, tgt.saga_instance_type_name, tgt.saga_instance_json);", _sqlTableName);
var json = JsonConvert.SerializeObject(saga, _jsonSettings);
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
connection.Execute(sql, new
{
correlation_id = saga.CorrelationId,
saga_instance_type_name = saga.GetType().FullName,
saga_instance_json = json
});
}
}
private void DeleteSaga(TSaga saga)
{
var sql = string.Format("DELETE FROM {0} WHERE correlation_id = @CorrelationId", _sqlTableName);
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
connection.Execute(sql, new { saga.CorrelationId });
}
}
/// <summary>
/// Finds the CorrelationIds for the sagas that match the filter
/// </summary>
/// <param name="filter">effectively a LINQ expression</param>
/// <returns/>
public IEnumerable<Guid> Find(ISagaFilter<TSaga> filter)
{
return Where(filter, x => x.CorrelationId);
}
/// <summary>
/// Finds the sagas that match the filter
/// </summary>
/// <param name="filter">effectively a LINQ expression</param>
/// <returns/>
public IEnumerable<TSaga> Where(ISagaFilter<TSaga> filter)
{
// TODO: How do we translate the Expression<Func<TSaga, bool>> expression into sql?
throw new NotImplementedException();
}
/// <summary>
/// Finds all of the sagas that match the filter, and then applies a transform on them.
/// </summary>
/// <typeparam name="TResult"/><param name="filter">effectively a LINQ expression</param><param name="transformer"/>
/// <returns/>
public IEnumerable<TResult> Where<TResult>(ISagaFilter<TSaga> filter, Func<TSaga, TResult> transformer)
{
return Where(filter).Select(transformer);
}
/// <summary>
/// Queries the underlying store for sagas, then applies a transform to them and returns the result
/// </summary>
/// <typeparam name="TResult"/><param name="transformer"/>
/// <returns/>
public IEnumerable<TResult> Select<TResult>(Func<TSaga, TResult> transformer)
{
var sql = string.Format("SELECT correlation_id, saga_instance_json FROM {0}", _sqlTableName);
// TODO: Transaction logic taken from the NHibernateSagaRepository
// Why is this needed? From my understanding, the transformations are not persisted.
using (var txScope = new TransactionScope(TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.Serializable }))
using (var connection = new SqlConnection(_connectionString))
{
connection.Open();
var transformedSagas = connection.Query<dynamic>(sql)
.Select(o => (TSaga)JsonConvert.DeserializeObject<TSaga>(o.saga_instance_json, _jsonSettings))
.Select(transformer)
.ToList();
txScope.Complete();
return transformedSagas;
}
}
}
/// <summary>
/// (De)Serialization of the State property on ISaga.
/// Uses the state machine to get the current state.
/// </summary>
/// <typeparam name="TSaga"></typeparam>
public class SagaStateJsonConverter<TSaga> : JsonConverter
where TSaga : class, ISaga
{
private readonly StateMachine<TSaga> _stateMachine;
public SagaStateJsonConverter(StateMachine<TSaga> stateMachine)
{
_stateMachine = stateMachine;
}
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
serializer.Serialize(writer, ((State)value).Name);
}
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
var val = (string)serializer.Deserialize(reader, typeof(string));
return _stateMachine.GetState(val);
}
public override bool CanConvert(Type objectType)
{
return typeof(State).IsAssignableFrom(objectType);
}
}
public class SagaServiceBusConverter : JsonConverter
{
// This is my attempt of ignoring the service bus property during
// serialization and deserialization, because I can't find any usage of it?
// Is this a problem?
// If not, is there a better way of completely ignoring the attribute,
// besides decorating the class with a JsonIgnore attribute?
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
serializer.Serialize(writer, null);
}
public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
return null;
}
public override bool CanConvert(Type objectType)
{
return typeof(IServiceBus).IsAssignableFrom(objectType);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment