Skip to content

Instantly share code, notes, and snippets.

Last active October 5, 2020 18:19
Show Gist options
  • Save dcolthorp/0ee87537624c596ccc1d72bffa73387b to your computer and use it in GitHub Desktop.
Save dcolthorp/0ee87537624c596ccc1d72bffa73387b to your computer and use it in GitHub Desktop.
Ref and Store redux-like implementation for c#
public delegate TResult RefFunc<TState, TResult>(ref TState current);
public delegate void RefAction<TState>(ref TState current);
public delegate void RefAction<TState, TEvent>(ref TState state, TEvent action);
namespace Framework
public static class RefPublishStrategies
public static void DefaultStrategy(Action act)
public static Action<Action> PublishWithScheduler(IScheduler scheduler)
return (Action act) => {
scheduler.Schedule(() => {
public class Ref<TState> where TState : struct
private TState _value;
private readonly Action<Action> _refPublishStrategy;
private readonly BehaviorSubject<StrongBox<TState>> _subject;
private readonly object _lock = new object();
private bool _handling = false;
private readonly ConcurrentDictionary<TaskCompletionSource<object>, bool> _completions = new ConcurrentDictionary<TaskCompletionSource<object>, bool>();
public Ref(TState start) : this(start, RefPublishStrategies.DefaultStrategy)
public Ref(TState start, Action<Action> refPublishStrategy)
_value = start;
_refPublishStrategy = refPublishStrategy;
_subject = new BehaviorSubject<StrongBox<TState>>(new StrongBox<TState>(_value));
public TState Value
get { lock (_lock) return _value; }
public Task SubmitAndPublishAsync(RefAction<TState> f)
var publish = new StrongBox<TState>();
lock (_lock)
if (_handling) throw new Exception("Handler resubmitted");
_handling = true;
try {
f(ref _value);
publish.Value = _value;
return PublishAction(() => {
return Task.CompletedTask;
} finally {
_handling = false;
public Task AllComplete => Task.WhenAll(_completions.Keys.Select(a => a.Task).ToArray());
public Task PublishAction(Func<Task> act)
var tcs = new TaskCompletionSource<object>();
_completions[tcs] = true;
_refPublishStrategy(async () =>
await act();
catch (Exception exception)
bool ignored;
if (!_completions.TryRemove(tcs, out ignored))
throw new Exception("Unable to remove completion handler!");
return tcs.Task;
public void Submit(RefAction<TState> f)
public IObservable<TResult> Observe<TResult>(RefFunc<TState, TResult> projection)
return _subject.Select(publish => _PerformProjection(ref publish.Value, projection));
public TResult Project<TResult>(RefFunc<TState, TResult> projection)
return _PerformProjection(ref _value, projection);
private TResult _PerformProjection<TResult>(ref TState state, RefFunc<TState, TResult> projection)
lock (_lock)
var original = state;
var ret = projection(ref state);
if (!state.Equals(original))
throw new ArgumentException("Observer mutated state!");
return ret;
catch (Exception e) when (ExceptionHandling.Never(e))
namespace UnitTests
interface IAction { }
struct DummyState
public int Count;
class SagaAction : IAction { }
class IncAction : IAction { }
class ExpectAndIncAction : IAction
public int Expected { get; }
public ExpectAndIncAction(int expected)
Expected = expected;
public class SagaTest
Store<IAction, DummyState> store;
static int Count(ref DummyState s)
return s.Count;
public void Setup()
store = new Store<IAction, DummyState>(
RefPublishStrategies.PublishWithScheduler(new EventLoopScheduler()));
public async Task TestSaga()
var tc1 = new TaskCompletionSource<object>();
var tc2 = new TaskCompletionSource<object>();
store.RegisterHandler((ref DummyState s, IncAction action) =>
s.Count += 1;
store.RegisterSaga<SagaAction>(async (store, evt) =>
await tc1.Task;
store.Dispatch(new IncAction());
await tc2.Task;
store.Dispatch(new IncAction());
Assert.That(store.Project(Count), Is.EqualTo(0));
var sagaTask = store.DispatchAndPublishAsync(new SagaAction());
await store.Observe(Count).Where(c => c == 1).FirstAsync();
Assert.That(store.Project(Count), Is.EqualTo(1));
await sagaTask;
Assert.That(store.Project(Count), Is.EqualTo(2));
public async Task SagasRunInParallel()
var tc1 = new TaskCompletionSource<object>();
var tc2 = new TaskCompletionSource<object>();
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) =>
Assert.AreEqual(action.Expected, s.Count);
s.Count += 1;
store.RegisterSaga<SagaAction>(async (store, evt) =>
store.Dispatch(new ExpectAndIncAction(0));
await tc2.Task;
store.Dispatch(new ExpectAndIncAction(2));
store.RegisterSaga<SagaAction>(async (store, evt) =>
await tc1.Task;
store.Dispatch(new ExpectAndIncAction(1));
Assert.That(store.Project(Count), Is.EqualTo(0));
store.Dispatch(new SagaAction());
await store.Observe(Count).Where(c => c == 1).FirstAsync();
await store.Observe(Count).Where(c => c == 2).FirstAsync();
await store.AllComplete;
Assert.That(store.Project(Count), Is.EqualTo(3));
public async Task TaskWaitsForAllToComplete()
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) =>
Assert.AreEqual(action.Expected, s.Count);
s.Count += 1;
store.RegisterSaga<SagaAction>((store, evt) =>
store.Dispatch(new ExpectAndIncAction(0));
return Task.CompletedTask;
store.RegisterSaga<SagaAction>(async (store, evt) =>
await Task.Delay(500);
store.Dispatch(new ExpectAndIncAction(1));
Assert.That(store.Project(Count), Is.EqualTo(0));
await store.DispatchAndPublishAsync(new SagaAction());
Assert.That(store.Project(Count), Is.EqualTo(2));
public async Task DispatchAndPublishAsyncThrowsSagaException()
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) =>
throw new InvalidCastException();
store.RegisterSaga<SagaAction>((store, evt) =>
store.Dispatch(new ExpectAndIncAction(0));
return Task.CompletedTask;
Assert.That(store.Project(Count), Is.EqualTo(0));
await store.DispatchAndPublishAsync(new SagaAction());
Assert.Fail("exception not thrown");
} catch (InvalidCastException)
// do nothing
public class Store<TActionBase, TState> where TState : struct
public delegate Task Saga(Store<TActionBase, TState> store, TActionBase evt);
protected readonly Ref<TState> _ref;
private readonly Dictionary<Type, List<RefAction<TState, TActionBase>>> _handlerMap;
private readonly List<RefAction<TState, TActionBase>> _globalBeforeHandlers;
private Dictionary<Type, List<Saga>> _sagaMap;
public Store(TState start)
_ref = new Ref<TState>(start);
_handlerMap = new Dictionary<Type, List<RefAction<TState, TActionBase>>>();
_sagaMap = new Dictionary<Type, List<Saga>>();
_globalBeforeHandlers = new List<RefAction<TState, TActionBase>>();
public Store(TState start, Action<Action> refPublishStrategy)
_ref = new Ref<TState>(start, refPublishStrategy);
_handlerMap = new Dictionary<Type, List<RefAction<TState, TActionBase>>>();
_sagaMap = new Dictionary<Type, List<Saga>>();
_globalBeforeHandlers = new List<RefAction<TState, TActionBase>>();
public void RegisterHandler<TEvent>(RefAction<TState, TEvent> f) where TEvent : TActionBase
var t = (typeof(TEvent));
List<RefAction<TState, TActionBase>> list;
if (!_handlerMap.TryGetValue(t, out list))
list = new List<RefAction<TState, TActionBase>>();
_handlerMap[t] = list;
RefAction<TState, TActionBase> wrappedHandler = (ref TState state, TActionBase e) =>
f(ref state, (TEvent)e);
public void RegisterGlobalHandler(RefAction<TState, TActionBase> f)
public void RegisterSaga<TEvent>(Saga f) where TEvent : TActionBase
var t = (typeof(TEvent));
List<Saga> list;
if (!_sagaMap.TryGetValue(t, out list))
list = new List<Saga>();
_sagaMap[t] = list;
Saga wrappedHandler = (state, e) =>
f(this, (TEvent)e);
public Task DispatchAndPublishAsync(TActionBase e)
var eventType = e.GetType();
List<RefAction<TState, TActionBase>> list;
IEnumerable<Task> tasks = new Task[0];
if (_handlerMap.TryGetValue(eventType, out list))
var publishTask = _ref.SubmitAndPublishAsync((ref TState state) =>
foreach (var beforeHandler in _globalBeforeHandlers)
beforeHandler(ref state, e);
foreach (var entry in list)
entry(ref state, e);
tasks = tasks.Concat(new[] { publishTask });
if (_sagaMap.ContainsKey(eventType))
var sagaTasks = _sagaMap[eventType]
.Select(f => _ref.PublishAction(() => f(this, e)));
tasks = tasks.Concat(sagaTasks);
return Task.WhenAll(tasks);
public void Dispatch(TActionBase e)
public Task AllComplete => _ref.AllComplete;
public IObservable<TResult> Observe<TResult>(RefFunc<TState, TResult> project) =>
public TResult Project<TResult>(RefFunc<TState, TResult> projection) =>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment