Skip to content

Instantly share code, notes, and snippets.

@fjeldstad
Last active October 6, 2015 20:21
Show Gist options
  • Save fjeldstad/8c9d002a0e1eb545261b to your computer and use it in GitHub Desktop.
Save fjeldstad/8c9d002a0e1eb545261b to your computer and use it in GitHub Desktop.
public abstract class Saga<TState> where TState : class
{
private const int MaxMessageHandlingRetryAttemptsPerMessage = 3;
private const int MillisecondsBetweenMessageHandlingAttempts = 500;
private readonly List<Func<IBus, string, Task>> _registrations = new List<Func<IBus, string, Task>>();
protected void When<TMessage>(
Func<TMessage, Task<IMemory>> accessMemory,
Func<TMessage, TState, Task<ITransformResult>> transform) where TMessage : class
{
if (accessMemory == null)
{
throw new ArgumentNullException(nameof(accessMemory));
}
if (transform == null)
{
throw new ArgumentNullException(nameof(transform));
}
_registrations.Add((bus, subscriberId) => bus.RegisterSubscriber(subscriberId, async (TMessage msg) =>
{
var attempts = 0;
var retry = false;
do
{
attempts++;
try
{
var memory = await accessMemory(msg).ConfigureAwait(false);
if (memory == null)
{
throw new InvalidOperationException(
$"Unable to access the memory of {GetType().Name} for {typeof(TMessage).Name} message.");
}
var result = await transform(msg, memory.State).ConfigureAwait(false);
await memory.Overwrite(result.NextState).ConfigureAwait(false);
foreach (var outputMessage in (result.Messages ?? new object[0]).Where(x => x != null))
{
await bus.Publish(outputMessage).ConfigureAwait(false);
}
break;
}
catch (SagaMessageHandlingException ex)
{
retry = attempts < MaxMessageHandlingRetryAttemptsPerMessage && ex.Retry;
if (retry && MillisecondsBetweenMessageHandlingAttempts > 0)
{
await Task.Delay(MillisecondsBetweenMessageHandlingAttempts).ConfigureAwait(false);
}
}
} while (retry);
}));
}
public async Task Connect(IBus bus, string subscriberId)
{
if (bus == null)
{
throw new ArgumentNullException(nameof(bus));
}
if (string.IsNullOrWhiteSpace(subscriberId))
{
throw new ArgumentNullException(nameof(subscriberId));
}
foreach (var registration in _registrations)
{
await registration(bus, subscriberId.Trim()).ConfigureAwait(false);
}
}
public interface IMemory
{
TState State { get; }
Func<TState, Task> Overwrite { get; }
}
public class Memory : IMemory
{
public TState State { get; }
public Func<TState, Task> Overwrite { get; }
public Memory(TState state, Func<TState, Task> overwrite)
{
State = state;
Overwrite = overwrite;
}
}
public interface ITransformResult
{
TState NextState { get; }
object[] Messages { get; }
}
public class TransformResult : ITransformResult
{
public TState NextState { get; }
public object[] Messages { get; }
public TransformResult(TState nextState, params object[] messages)
{
NextState = nextState;
Messages = messages ?? new object[0];
}
}
}
public class SagaMessageHandlingException : Exception
{
public bool Retry { get; }
public SagaMessageHandlingException(string message, bool retry, Exception innerException)
: base(message, innerException)
{
Retry = retry;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment