Skip to content

Instantly share code, notes, and snippets.

@dcolthorp
Last active October 5, 2020 18:19
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcolthorp/0ee87537624c596ccc1d72bffa73387b to your computer and use it in GitHub Desktop.
Save dcolthorp/0ee87537624c596ccc1d72bffa73387b to your computer and use it in GitHub Desktop.
Ref and Store redux-like implementation for c#
public delegate TResult RefFunc<TState, TResult>(ref TState current);
public delegate void RefAction<TState>(ref TState current);
public delegate void RefAction<TState, TEvent>(ref TState state, TEvent action);
namespace Framework
{
public static class RefPublishStrategies
{
public static void DefaultStrategy(Action act)
{
act();
}
public static Action<Action> PublishWithScheduler(IScheduler scheduler)
{
return (Action act) => {
scheduler.Schedule(() => {
act();
});
};
}
}
public class Ref<TState> where TState : struct
{
private TState _value;
private readonly Action<Action> _refPublishStrategy;
private readonly BehaviorSubject<StrongBox<TState>> _subject;
private readonly object _lock = new object();
private bool _handling = false;
private readonly ConcurrentDictionary<TaskCompletionSource<object>, bool> _completions = new ConcurrentDictionary<TaskCompletionSource<object>, bool>();
public Ref(TState start) : this(start, RefPublishStrategies.DefaultStrategy)
{
}
public Ref(TState start, Action<Action> refPublishStrategy)
{
_value = start;
_refPublishStrategy = refPublishStrategy;
_subject = new BehaviorSubject<StrongBox<TState>>(new StrongBox<TState>(_value));
}
public TState Value
{
get { lock (_lock) return _value; }
}
public Task SubmitAndPublishAsync(RefAction<TState> f)
{
var publish = new StrongBox<TState>();
lock (_lock)
{
if (_handling) throw new Exception("Handler resubmitted");
_handling = true;
try {
f(ref _value);
publish.Value = _value;
return PublishAction(() => {
_subject.OnNext(publish);
return Task.CompletedTask;
});
} finally {
_handling = false;
}
}
}
public Task AllComplete => Task.WhenAll(_completions.Keys.Select(a => a.Task).ToArray());
public Task PublishAction(Func<Task> act)
{
var tcs = new TaskCompletionSource<object>();
_completions[tcs] = true;
_refPublishStrategy(async () =>
{
try
{
await act();
tcs.SetResult(null);
}
catch (Exception exception)
{
tcs.SetException(exception);
}
bool ignored;
if (!_completions.TryRemove(tcs, out ignored))
{
throw new Exception("Unable to remove completion handler!");
}
});
return tcs.Task;
}
public void Submit(RefAction<TState> f)
{
SubmitAndPublishAsync(f);
}
public IObservable<TResult> Observe<TResult>(RefFunc<TState, TResult> projection)
{
return _subject.Select(publish => _PerformProjection(ref publish.Value, projection));
}
public TResult Project<TResult>(RefFunc<TState, TResult> projection)
{
return _PerformProjection(ref _value, projection);
}
private TResult _PerformProjection<TResult>(ref TState state, RefFunc<TState, TResult> projection)
{
lock (_lock)
{
#if DEBUG
var original = state;
#endif
try
{
var ret = projection(ref state);
#if DEBUG
if (!state.Equals(original))
{
throw new ArgumentException("Observer mutated state!");
}
#endif
return ret;
}
catch (Exception e) when (ExceptionHandling.Never(e))
{
throw;
}
}
}
}
}
namespace UnitTests
{
interface IAction { }
struct DummyState
{
public int Count;
}
class SagaAction : IAction { }
class IncAction : IAction { }
class ExpectAndIncAction : IAction
{
public int Expected { get; }
public ExpectAndIncAction(int expected)
{
Expected = expected;
}
}
[TestFixture]
public class SagaTest
{
Store<IAction, DummyState> store;
static int Count(ref DummyState s)
{
return s.Count;
}
[SetUp]
public void Setup()
{
store = new Store<IAction, DummyState>(
default(DummyState),
RefPublishStrategies.PublishWithScheduler(new EventLoopScheduler()));
}
[Test]
public async Task TestSaga()
{
var tc1 = new TaskCompletionSource<object>();
var tc2 = new TaskCompletionSource<object>();
store.RegisterHandler((ref DummyState s, IncAction action) =>
{
s.Count += 1;
});
store.RegisterSaga<SagaAction>(async (store, evt) =>
{
await tc1.Task;
store.Dispatch(new IncAction());
await tc2.Task;
store.Dispatch(new IncAction());
});
Assert.That(store.Project(Count), Is.EqualTo(0));
var sagaTask = store.DispatchAndPublishAsync(new SagaAction());
tc1.SetResult(null);
await store.Observe(Count).Where(c => c == 1).FirstAsync();
Assert.That(store.Project(Count), Is.EqualTo(1));
tc2.SetResult(null);
await sagaTask;
Assert.That(store.Project(Count), Is.EqualTo(2));
}
[Test]
public async Task SagasRunInParallel()
{
var tc1 = new TaskCompletionSource<object>();
var tc2 = new TaskCompletionSource<object>();
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) =>
{
Assert.AreEqual(action.Expected, s.Count);
s.Count += 1;
});
store.RegisterSaga<SagaAction>(async (store, evt) =>
{
store.Dispatch(new ExpectAndIncAction(0));
await tc2.Task;
store.Dispatch(new ExpectAndIncAction(2));
});
store.RegisterSaga<SagaAction>(async (store, evt) =>
{
await tc1.Task;
store.Dispatch(new ExpectAndIncAction(1));
});
Assert.That(store.Project(Count), Is.EqualTo(0));
store.Dispatch(new SagaAction());
await store.Observe(Count).Where(c => c == 1).FirstAsync();
tc1.SetResult(null);
await store.Observe(Count).Where(c => c == 2).FirstAsync();
tc2.SetResult(null);
await store.AllComplete;
Assert.That(store.Project(Count), Is.EqualTo(3));
}
[Test]
public async Task TaskWaitsForAllToComplete()
{
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) =>
{
Assert.AreEqual(action.Expected, s.Count);
s.Count += 1;
});
store.RegisterSaga<SagaAction>((store, evt) =>
{
store.Dispatch(new ExpectAndIncAction(0));
return Task.CompletedTask;
});
store.RegisterSaga<SagaAction>(async (store, evt) =>
{
await Task.Delay(500);
store.Dispatch(new ExpectAndIncAction(1));
});
Assert.That(store.Project(Count), Is.EqualTo(0));
await store.DispatchAndPublishAsync(new SagaAction());
Assert.That(store.Project(Count), Is.EqualTo(2));
}
[Test]
public async Task DispatchAndPublishAsyncThrowsSagaException()
{
store.RegisterHandler((ref DummyState s, ExpectAndIncAction action) =>
{
throw new InvalidCastException();
});
store.RegisterSaga<SagaAction>((store, evt) =>
{
store.Dispatch(new ExpectAndIncAction(0));
return Task.CompletedTask;
});
Assert.That(store.Project(Count), Is.EqualTo(0));
try
{
await store.DispatchAndPublishAsync(new SagaAction());
Assert.Fail("exception not thrown");
} catch (InvalidCastException)
{
// do nothing
}
}
}
}
public class Store<TActionBase, TState> where TState : struct
{
public delegate Task Saga(Store<TActionBase, TState> store, TActionBase evt);
protected readonly Ref<TState> _ref;
private readonly Dictionary<Type, List<RefAction<TState, TActionBase>>> _handlerMap;
private readonly List<RefAction<TState, TActionBase>> _globalBeforeHandlers;
private Dictionary<Type, List<Saga>> _sagaMap;
public Store(TState start)
{
_ref = new Ref<TState>(start);
_handlerMap = new Dictionary<Type, List<RefAction<TState, TActionBase>>>();
_sagaMap = new Dictionary<Type, List<Saga>>();
_globalBeforeHandlers = new List<RefAction<TState, TActionBase>>();
}
public Store(TState start, Action<Action> refPublishStrategy)
{
_ref = new Ref<TState>(start, refPublishStrategy);
_handlerMap = new Dictionary<Type, List<RefAction<TState, TActionBase>>>();
_sagaMap = new Dictionary<Type, List<Saga>>();
_globalBeforeHandlers = new List<RefAction<TState, TActionBase>>();
}
public void RegisterHandler<TEvent>(RefAction<TState, TEvent> f) where TEvent : TActionBase
{
var t = (typeof(TEvent));
List<RefAction<TState, TActionBase>> list;
if (!_handlerMap.TryGetValue(t, out list))
{
list = new List<RefAction<TState, TActionBase>>();
_handlerMap[t] = list;
}
RefAction<TState, TActionBase> wrappedHandler = (ref TState state, TActionBase e) =>
{
f(ref state, (TEvent)e);
};
list.Add(wrappedHandler);
}
public void RegisterGlobalHandler(RefAction<TState, TActionBase> f)
{
_globalBeforeHandlers.Add(f);
}
public void RegisterSaga<TEvent>(Saga f) where TEvent : TActionBase
{
var t = (typeof(TEvent));
List<Saga> list;
if (!_sagaMap.TryGetValue(t, out list))
{
list = new List<Saga>();
_sagaMap[t] = list;
}
Saga wrappedHandler = (state, e) =>
f(this, (TEvent)e);
list.Add(wrappedHandler);
}
public Task DispatchAndPublishAsync(TActionBase e)
{
var eventType = e.GetType();
List<RefAction<TState, TActionBase>> list;
IEnumerable<Task> tasks = new Task[0];
if (_handlerMap.TryGetValue(eventType, out list))
{
var publishTask = _ref.SubmitAndPublishAsync((ref TState state) =>
{
foreach (var beforeHandler in _globalBeforeHandlers)
{
beforeHandler(ref state, e);
}
foreach (var entry in list)
{
entry(ref state, e);
}
});
tasks = tasks.Concat(new[] { publishTask });
}
if (_sagaMap.ContainsKey(eventType))
{
var sagaTasks = _sagaMap[eventType]
.Select(f => _ref.PublishAction(() => f(this, e)));
tasks = tasks.Concat(sagaTasks);
}
return Task.WhenAll(tasks);
}
public void Dispatch(TActionBase e)
{
DispatchAndPublishAsync(e);
}
public Task AllComplete => _ref.AllComplete;
public IObservable<TResult> Observe<TResult>(RefFunc<TState, TResult> project) =>
_ref.Observe(project).DistinctUntilChanged();
public TResult Project<TResult>(RefFunc<TState, TResult> projection) =>
_ref.Project(projection);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment