Skip to content

Instantly share code, notes, and snippets.

@afruzan
Created February 9, 2023 15:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afruzan/70ee281d303030cb93fa39a4b3cd4f34 to your computer and use it in GitHub Desktop.
Save afruzan/70ee281d303030cb93fa39a4b3cd4f34 to your computer and use it in GitHub Desktop.
C# thread safe local Event Bus
using System.Collections.Concurrent;
using System.Linq.Expressions;
using System.Reflection;
namespace Afruzan.Shared.EventBus;
public class LocalEventBus : IEventBus
{
public LocalEventBus()
{
}
private readonly ConcurrentDictionary<Type, ConcurrentDictionary<Action<object>, int>> listeners = new();
public Task Register<TEvent>(Action<object> listener)
{
if (listener == null)
{
throw new ArgumentNullException(nameof(listener));
}
var eventType = typeof(TEvent);
if (!listener.GetMethodInfo().GetParameters()[0].ParameterType.IsAssignableFrom(eventType))
{
throw new ArgumentException("listener argumnet is not assinable from event type.", nameof(listener));
}
var listenersBag = listeners.GetOrAdd(eventType, t => new());
listenersBag.AddOrUpdate(listener, 1, (l, registerAttemptsCount) => registerAttemptsCount + 1);
return Task.CompletedTask;
}
public Task Unregister<TEvent>(Action<object> listener)
{
if (listener == null)
{
throw new ArgumentNullException(nameof(listener));
}
var eventType = typeof(TEvent);
if (!listener.GetMethodInfo().GetParameters()[0].ParameterType.IsAssignableFrom(eventType))
{
throw new ArgumentException("listener argumnet is not assinable from event type.", nameof(listener));
}
if (listeners.TryGetValue(eventType, out var listenersBag))
{
listenersBag.TryRemove(listener, out _);
}
return Task.CompletedTask;
}
public void PostEvent<TEvent, TEventObject>(TEventObject @event) where TEventObject : TEvent
{
if (@event == null)
{
throw new ArgumentNullException(nameof(@event));
}
var eventType = typeof(TEvent);
if (listeners.TryGetValue(eventType, out var listenersBag))
{
int invokedCount = 0;
// The enumerator returned from the dictionary is safe to use concurrently with reads and writes to the dictionary,
// however it does not represent a moment-in-time snapshot of the dictionary. The contents exposed through the enumerator,
// may contain modifications made to the dictionary after GetEnumerator was called.
var enumerator = listenersBag.GetEnumerator();
while (enumerator.MoveNext())
{
enumerator.Current.Key.Invoke(@event);
invokedCount++;
}
}
}
public Task PostEventAsync<TEvent, TEventObject>(TEventObject @event) where TEventObject : TEvent
{
return Task.Run(() => PostEvent<TEvent, TEventObject>(@event));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment