Skip to content

Instantly share code, notes, and snippets.

@jeremydmiller
Created February 12, 2014 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jeremydmiller/8962060 to your computer and use it in GitHub Desktop.
Save jeremydmiller/8962060 to your computer and use it in GitHub Desktop.
FubuTransportation saga concurrency propose
using System;
using System.Runtime.Serialization;
namespace FubuTransportation.Sagas.Concurrency
{
/* NOTES
*
* Add a new interface called IStatefulSaga to make things easier
* Add a new interface called IConcurrencyRule that will throw SagaConcurrencyException to abort the current
* TX and queued up outgoing messages if a concurrency violation is detected.
*
* We probably need to research RavenDb optimistic locking. That might actually be good enough for us
*
* - Modify the SagaBehavior to:
* 1.) Call IConcurrencyRule.AssertBeforeDoingWork() before the inner
* 2.) Call IConcurrencyRule.AssertBeforeCommitting() after the inner
* 3.) Wrap the call to the inner in a try/finally and always call IConcurrencyRule.Release() in the finally
* to make sure we release any offline pessimistic locks
*
* Say that IConcurrencyRule is optionally configured per saga state.
* Default is a Nullo
* Add a new extension to FubuTransportRegistry to register saga concurrency per state type?
* *gasp* -- Add an attribute on IStatefulSaga to do the same
*
*
* Maybe add a new policy that would wrap all Saga chains with an error policy on
* SagaConcurrencyException?
* Maybe expose that off a new FubuTransportRegistry.Saga.For<TState> hive?
* Rather allow you to stick the saga rules directly on the SagaState somehow. Don't like the
* central repo.
* Need more thought on this one.
*/
// "T" is the SagaState
public interface IConcurrencyRule<T>
{
void AssertBeforeDoingWork(T state);
void AssertBeforeCommitting(T state, Func<T> existingFinder);
void Release(T state);
}
// This will be registered as the default
public class NulloConcurrencyRule<T> : IConcurrencyRule<T>
{
public void AssertBeforeDoingWork(T state)
{
// nothing
}
public void AssertBeforeCommitting(T state, Func<T> existingFinder)
{
// nothing
}
public void Release(T state)
{
throw new NotImplementedException();
}
}
public interface IStatefulSaga
{
Guid Id { get; }
// do we even care about having this be sequential vs Guid?
// TODO -- look at Raven's version support
Guid Version { get; }
}
public class PessimisticConcurrencyRule<T> : IConcurrencyRule<T> where T : IStatefulSaga
{
public void AssertBeforeDoingWork(T state)
{
// will try to write to some sort of offline pessimistic lock storage
throw new NotImplementedException();
}
public void AssertBeforeCommitting(T state, Func<T> existingFinder)
{
// nothing
}
public void Release(T state)
{
// release the offline lock
throw new NotImplementedException();
}
}
public class OptimisticConcurrencyRule<T> : IConcurrencyRule<T> where T : IStatefulSaga
{
public void AssertBeforeDoingWork(T state)
{
// nothing
}
public void AssertBeforeCommitting(T state, Func<T> existingFinder)
{
var old = existingFinder();
if (old.Version != state.Version)
{
throw new SagaConcurrencyException("Some good explanation here");
}
}
public void Release(T state)
{
}
}
[Serializable]
public class SagaConcurrencyException : Exception
{
public SagaConcurrencyException(string message) : base(message)
{
}
public SagaConcurrencyException(string message, Exception innerException) : base(message, innerException)
{
}
protected SagaConcurrencyException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment