Last active
June 14, 2017 08:56
-
-
Save perokvist/ef866f886df25d93ef7e9cca283456c0 to your computer and use it in GitHub Desktop.
C# EventProcessor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EventProcessor | |
{ | |
private readonly IDictionary<IAggregateId, (Guid, SemaphoreSlim)> _locks; | |
private readonly Action<string> _logger; | |
private readonly List<(Type, Func<IEvent, Task>)> _h = new List<(Type, Func<IEvent, Task>)>(); | |
public EventProcessor(IDictionary<IAggregateId, (Guid, SemaphoreSlim)> locks) : this(locks, s => { }) | |
{} | |
public EventProcessor(IDictionary<IAggregateId, (Guid, SemaphoreSlim)> locks, Action<string> logger) | |
{ | |
_locks = locks; | |
_logger = logger; | |
} | |
public void Register<T>(Func<T, Task> f) | |
where T : IEvent | |
=> _h.Add((typeof(T), @event => f((T)@event))); | |
public async Task PublishAsync(params IEvent[] events) | |
{ | |
if (!events.All(e => e.Meta.ContainsKey(nameof(EventMetaData.CorrelationId)))) | |
throw new ArgumentException("CorrelationId required"); | |
var h = events.SelectMany(e => _h | |
.Where(DelegatefForTypeOrInterface(e)) | |
.Select(ExecuteDelegate(e))) | |
.ToArray(); | |
await Task.WhenAll(h.Select(x => x.Task)); | |
//Only release once per aggregate | |
h.GroupBy(x => x.Correlation) | |
.Select(x => x.First()) | |
.ForEach(x => ReleaseIfPresent(_locks, x.AggregateId, x.Correlation, _logger)); | |
} | |
private static Func<(Type, Func<IEvent, Task> Func), (Task Task, IAggregateId AggregateId, Guid Correlation)> ExecuteDelegate(IEvent e) | |
=> f => (f.Func(e), new AggregateId(e.AggregateId), Guid.Parse(e.Meta[nameof(EventMetaData.CorrelationId)])); | |
private static Func<(Type Type, Func<IEvent, Task>), bool> DelegatefForTypeOrInterface(IEvent e) | |
=> kv => kv.Type == e.GetType() || e.GetType().GetTypeInfo().GetInterfaces().Any(t => t == kv.Type); | |
private static void ReleaseIfPresent(IDictionary<IAggregateId, (Guid Guid, SemaphoreSlim SemaphoreSlim)> locks, | |
IAggregateId aggregateId, Guid correlationId, Action<string> logger) | |
{ | |
if (!locks.ContainsKey(aggregateId)) | |
{ | |
logger($"Lock for {aggregateId.Id} not found."); | |
return; | |
} | |
if (locks[aggregateId].Guid != correlationId) | |
{ | |
logger($"Lock for {aggregateId.Id} with correlation {correlationId} not found."); | |
return; | |
} | |
logger($"Lock for {aggregateId.Id} with correlation {correlationId} about to release."); | |
locks[aggregateId].SemaphoreSlim.Release(); | |
logger($"Lock for {aggregateId.Id} with correlation {correlationId} released."); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment