Skip to content

Instantly share code, notes, and snippets.

@perokvist
Last active June 14, 2017 08:56
Show Gist options
  • Save perokvist/ef866f886df25d93ef7e9cca283456c0 to your computer and use it in GitHub Desktop.
Save perokvist/ef866f886df25d93ef7e9cca283456c0 to your computer and use it in GitHub Desktop.
C# EventProcessor
public class EventProcessor
{
private readonly IDictionary<IAggregateId, (Guid, SemaphoreSlim)> _locks;
private readonly Action<string> _logger;
private readonly List<(Type, Func<IEvent, Task>)> _h = new List<(Type, Func<IEvent, Task>)>();
public EventProcessor(IDictionary<IAggregateId, (Guid, SemaphoreSlim)> locks) : this(locks, s => { })
{}
public EventProcessor(IDictionary<IAggregateId, (Guid, SemaphoreSlim)> locks, Action<string> logger)
{
_locks = locks;
_logger = logger;
}
public void Register<T>(Func<T, Task> f)
where T : IEvent
=> _h.Add((typeof(T), @event => f((T)@event)));
public async Task PublishAsync(params IEvent[] events)
{
if (!events.All(e => e.Meta.ContainsKey(nameof(EventMetaData.CorrelationId))))
throw new ArgumentException("CorrelationId required");
var h = events.SelectMany(e => _h
.Where(DelegatefForTypeOrInterface(e))
.Select(ExecuteDelegate(e)))
.ToArray();
await Task.WhenAll(h.Select(x => x.Task));
//Only release once per aggregate
h.GroupBy(x => x.Correlation)
.Select(x => x.First())
.ForEach(x => ReleaseIfPresent(_locks, x.AggregateId, x.Correlation, _logger));
}
private static Func<(Type, Func<IEvent, Task> Func), (Task Task, IAggregateId AggregateId, Guid Correlation)> ExecuteDelegate(IEvent e)
=> f => (f.Func(e), new AggregateId(e.AggregateId), Guid.Parse(e.Meta[nameof(EventMetaData.CorrelationId)]));
private static Func<(Type Type, Func<IEvent, Task>), bool> DelegatefForTypeOrInterface(IEvent e)
=> kv => kv.Type == e.GetType() || e.GetType().GetTypeInfo().GetInterfaces().Any(t => t == kv.Type);
private static void ReleaseIfPresent(IDictionary<IAggregateId, (Guid Guid, SemaphoreSlim SemaphoreSlim)> locks,
IAggregateId aggregateId, Guid correlationId, Action<string> logger)
{
if (!locks.ContainsKey(aggregateId))
{
logger($"Lock for {aggregateId.Id} not found.");
return;
}
if (locks[aggregateId].Guid != correlationId)
{
logger($"Lock for {aggregateId.Id} with correlation {correlationId} not found.");
return;
}
logger($"Lock for {aggregateId.Id} with correlation {correlationId} about to release.");
locks[aggregateId].SemaphoreSlim.Release();
logger($"Lock for {aggregateId.Id} with correlation {correlationId} released.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment