Skip to content

Instantly share code, notes, and snippets.

@smudge202
Last active October 23, 2015 09:17
Show Gist options
  • Save smudge202/7c426c5295a5136c5489 to your computer and use it in GitHub Desktop.
Save smudge202/7c426c5295a5136c5489 to your computer and use it in GitHub Desktop.
Composition based Event bus subscriptions
using Composition;
using System;
namespace Domain
{
public static class ApplicationExtensions
{
/// <summary>
/// This extension will simply fire an <see cref="EventBusStarted"/> event.
/// In debug mode, the extension will also <see cref="System.Diagnostics.Debug.Assert">Assert</see> that
/// the event bus is able to send and receive.
/// </summary>
public static void UseEventBus(this Executable app)
{
app.OnExecute<EventBus>(bus =>
{
#if DEBUG
var test = new EventBusTest();
bus.Subscribe(test);
#endif
bus.Publish(new EventBusStarted());
#if DEBUG
System.Diagnostics.Debug.Assert(test.ReceivedEvent, "The event bus has not started correctly");
#endif
});
}
#if DEBUG
private class EventBusTest : IObserver<EventBusStarted>
{
public bool ReceivedEvent { get; private set; }
public void OnNext(EventBusStarted value)
=> ReceivedEvent = true;
public void OnCompleted() { }
public void OnError(Exception error) { }
}
#endif
}
}
using System;
namespace Domain
{
/// <summary>
/// General purpose event bus to provide the ability to publish events to
/// [n]one to many undetermined subscribers.
///
/// Event Buses are added to the composition graph as a singleton, so each
/// injected instance can be treated as such.
/// </summary>
public interface EventBus
{
/// <summary>
/// Subscriptions are made by Event <see cref="Type"/>.
/// </summary>
/// <param name="observer">
/// When events are <see cref="Publish{T}(T)">published</see>, the <see cref="IObserver{T}.OnNext(T)">OnNext</see>
/// method of your observer will be called by the <see cref="EventBus">Event Bus</see>.
///
/// There are Helper methods available in the ObservableHelpers assembly to
/// generate an <see cref="IObserver{T}"/> from an <see cref="Action{T}"/>
/// </param>
/// <returns>
/// A subscription. These subscriptions MUST be stored or you risk the Garbage Collection disposing your subscription.
///
/// There are Helper methods available in the ObservableHelpers assembly to
/// hold the subscription for the lifetime of the Application Domain, to save you creating otherwise redundant private fields.
/// </returns>
IDisposable Subscribe<T>(IObserver<T> observer);
/// <summary>
/// Publishes the given <paramref name="event"/> to all active subscriptions
/// </summary>
void Publish<T>(T @event);
}
}
using Composition;
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Moq;
using System;
using System.Collections.Generic;
using System.Linq;
namespace Domain.Tests
{
[TestClass]
public class EventBusTests
{
private static Subscribers DefaultSubscribers
{
get
{
return new Subscribers();
}
}
private static IEnumerable<Action<Subscribers>> DefaultSubscriptions
{
get
{
return Enumerable.Empty<Action<Subscribers>>();
}
}
private static EventBus CreateTarget()
{
return new NonPersistentInMemoryEventBus(DefaultSubscribers, DefaultSubscriptions);
}
[TestMethod]
public void WhenSubscribersNotProvidedThenThrowsException()
{
Action act = () => new NonPersistentInMemoryEventBus(null, DefaultSubscriptions);
act.ShouldThrow<ArgumentNullException>();
}
[TestMethod]
public void WhenUnrealizedSubscriptionsNotProvidedThenThrowsException()
{
Action act = () => new NonPersistentInMemoryEventBus(DefaultSubscribers, null);
act.ShouldThrow<ArgumentNullException>();
}
[TestMethod]
public void WhenPublishingDespiteNoSubscriptionsThenDoesNotThrowException()
{
Action act = () => CreateTarget().Publish(new object());
act.ShouldNotThrow<Exception>();
}
[TestMethod]
public void WhenObserverNotProvidedThenThrowsException()
{
Action act = () => CreateTarget().Subscribe<object>(null);
act.ShouldThrow<ArgumentNullException>();
}
[TestMethod]
public void WhenObserverProvidedThenReturnsSubscription()
{
using (var subscription = CreateTarget().Subscribe(new Fake.Observer()))
subscription.Should().NotBeNull();
}
[TestMethod]
public void WhenSubscriberExistsForPublishedEventThenSubscriberIsNotified()
{
var observer = new Mock<IObserver<Fake.Event>>();
var @event = new Fake.Event();
var target = CreateTarget();
using (var subscription = target.Subscribe(observer.Object))
target.Publish(@event);
observer.Verify(m => m.OnNext(@event), Times.Once);
}
[TestMethod]
public void WhenSubscriberExistsForDerivativeOfPublishedEventThenSubscriberIsNotified()
{
var observer = new Mock<IObserver<Fake.Event>>();
var @event = new Fake.DerivativeEvent();
var target = CreateTarget();
using (var subscription = target.Subscribe(observer.Object))
target.Publish(@event);
observer.Verify(m => m.OnNext(@event), Times.Once);
}
[TestMethod]
public void WhenSubscriptionIsDisposedThenSubscribedIsNotNotifiedOfPublishedEvents()
{
var observer = new Mock<IObserver<Fake.Event>>();
var @event = new Fake.Event();
var target = CreateTarget();
using (var subscription = target.Subscribe(observer.Object)) { }
target.Publish(@event);
observer.Verify(m => m.OnNext(@event), Times.Never);
}
[TestMethod]
public void WhenSubscribersAddedToCompositionThenSubscriberReceivesPublishedEvents()
{
var app = new Fake.Application();
IDisposable subscription;
app.UseServices(services =>
{
services.AddEventBus();
subscription = services.AddSubscriber<Fake.Observer, Fake.Event>();
});
app.UseEventBus();
var @event = new Fake.Event();
app.OnExecute<EventBus>(bus => bus.Publish(@event));
Fake.Observer.ReceivedEvent.Should().BeNull();
app.Execute();
Fake.Observer.ReceivedEvent.Should().Be(@event);
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
namespace Domain
{
/// <summary>
/// This is a thread safe, in-memory only implementation of the <see cref="EventBus"/>.
///
/// It is strictly in-memory, which replicates the behaviour of the current system (i.e. if the lights go out, you lose all
/// current transactions). I highly recommend upgrading to a reliable infrastructure if dependence on this service grows.
/// </summary>
internal sealed class NonPersistentInMemoryEventBus : EventBus
{
private readonly Subscribers _subscribers;
private ConcurrentStack<Action<Subscribers>> _unrealizedSubscriptions;
public NonPersistentInMemoryEventBus(Subscribers subscribers, IEnumerable<Action<Subscribers>> unrealizedSubscriptions)
{
if (subscribers == null)
throw new ArgumentNullException(nameof(subscribers));
if (unrealizedSubscriptions == null)
throw new ArgumentNullException(nameof(unrealizedSubscriptions));
_subscribers = subscribers;
_unrealizedSubscriptions = new ConcurrentStack<Action<Subscribers>>(unrealizedSubscriptions);
}
public void Publish<T>(T @event)
{
if (!_unrealizedSubscriptions.IsEmpty)
RealizeSubscriptions();
foreach (var observer in _subscribers.GetSubscribersFor<T>())
observer.OnNext(@event);
}
private void RealizeSubscriptions()
{
Action<Subscribers> unrealizedSubscription;
while (_unrealizedSubscriptions.TryPop(out unrealizedSubscription))
unrealizedSubscription(_subscribers);
}
public IDisposable Subscribe<T>(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
return _subscribers.Add(new Subscriber(typeof(T), observer));
}
}
}
using Microsoft.Framework.DependencyInjection;
using Microsoft.Framework.DependencyInjection.Extensions;
using System;
namespace Domain
{
// I freely admit that between this class and the nested classes in Subscription... head fuck
public static class ServiceExtensions
{
/// <summary>
/// These services must be added in order to use the EventBus. They only need to be added once, but can safely
/// be called multiple times.
/// </summary>
/// <remarks>
/// The default <see cref="NonPersistentInMemoryEventBus"/> implementation can be overriden by adding another
/// implementation of <see cref="EventBus"/> before calling this extension, or by adding (not TryAdd) afterwards.
/// </remarks>
public static IServiceCollection AddEventBus(this IServiceCollection services)
{
services.TryAddSingleton<Subscribers, Subscribers>();
services.TryAddSingleton<EventBus, NonPersistentInMemoryEventBus>();
return services;
}
/// <summary>
/// Allows subscriptions to be made as part of the composition graph. Realization and instantiation
/// is deferred until the application is executed, which is pretty neat if you ask me...
/// </summary>
/// <returns>
/// A Subscription. Ensure the subscription is held or you will fall
/// prey to the Garbage Collector Unsubscribing you seemingly randomly
///
/// There are helpers available in the ObservableHelpers assembly to
/// hold subscriptions for the lifetime of the Application Domain
/// </returns>
public static IDisposable AddSubscriber<Subscriber, Event>(this IServiceCollection services)
where Subscriber : class, IObserver<Event>
{
services.AddTransient<Subscriber, Subscriber>();
return services.GetSubscription<Subscriber, Event>();
}
/// <summary>
/// Allows subscriptions to be made for a pre-initialised <paramref name="subscriber"/>, whilst preserving
/// deferred realization and instantiation of the <see cref="EventBus"/> itself. Also pretty neat...
/// </summary>
/// <returns>
/// A Subscription. Ensure the subscription is held or you will fall
/// prey to the Garbage Collector Unsubscribing you seemingly randomly
///
/// There are helpers available in the ObservableHelpers assembly to
/// hold subscriptions for the lifetime of the Application Domain
/// </returns>
public static IDisposable AddSubscriber<Event>(this IServiceCollection services, IObserver<Event> subscriber)
{
services.AddInstance(subscriber);
return services.GetSubscription<IObserver<Event>, Event>();
}
private static IDisposable GetSubscription<Subscriber, Event>(this IServiceCollection services)
where Subscriber : class, IObserver<Event>
{
var subscription = Subscription.For<Event>();
services.AddTransient(provider => GetRealizedSubscription<Subscriber, Event>(provider, subscription));
return subscription;
}
private static Action<Subscribers> GetRealizedSubscription<Subscriber, Event>(IServiceProvider provider, Subscription.DeferredRealization<Event> subscription)
where Subscriber : IObserver<Event>
{
return subscribers =>
{
subscription.Realize(provider.GetRequiredService<Subscriber>());
subscribers.Add(new Domain.Subscriber(typeof(Event), subscription.RealizedService));
};
}
}
}
using System;
namespace Domain
{
internal sealed class Subscriber
{
public Subscriber(Type type, object subscriber)
{
Type = type;
Observer = subscriber;
}
public Type Type { get; }
public object Observer { get; }
}
}
using System;
using System.Collections.Generic;
using System.Threading;
namespace Domain
{
internal sealed class Subscribers
{
private readonly List<Subscriber> _subscribers = new List<Subscriber>();
/// <summary>
/// Enumeration of <see cref="_subscribers"/> in the <see cref="GetSubscribersFor{T}"/> method
/// makes usage of a ConcurrentCollection much more difficult, so I've gone with a simple, manual lock.
/// </summary>
private readonly ReaderWriterLockSlim _subscriberLock = new ReaderWriterLockSlim();
public IDisposable Add(Subscriber subscriber)
{
_subscriberLock.EnterWriteLock();
try
{
_subscribers.Add(subscriber);
return Subscription.For(subscriber, Remove);
}
finally
{
_subscriberLock.ExitWriteLock();
}
}
public void Remove(Subscriber subscriber)
{
_subscriberLock.EnterWriteLock();
try
{
_subscribers.Remove(subscriber);
}
finally
{
_subscriberLock.ExitWriteLock();
}
}
public IEnumerable<IObserver<T>> GetSubscribersFor<T>()
{
var type = typeof(T);
var results = new List<IObserver<T>>();
_subscriberLock.EnterReadLock();
try
{
foreach (var subscriber in _subscribers)
if (subscriber.Type.IsAssignableFrom(type))
results.Add((IObserver<T>)subscriber.Observer);
}
finally
{
_subscriberLock.ExitReadLock();
}
return results;
}
}
}
using System;
namespace Domain
{
internal class Subscription
{
internal static DeferredRealization<Event> For<Event>()
{
return new DeferredRealization<Event>();
}
internal class DeferredRealization<Event> : IDisposable
{
public IDisposable Subscription { get; private set; }
public IObserver<Event> RealizedService { get; private set; }
public void Realize<Subscriber>(Subscriber subscriber) where Subscriber : IObserver<Event>
=> RealizedService = subscriber;
public void Subscribe(EventBus bus)
=> Subscription = bus.Subscribe(RealizedService);
public void Dispose()
=> Subscription.Dispose();
}
internal static DelegatedDisposal For(Subscriber subscriber, Action<Subscriber> dispose)
{
return new DelegatedDisposal(subscriber, dispose);
}
internal class DelegatedDisposal : IDisposable
{
private readonly Action<Subscriber> _dispose;
private readonly Subscriber _subscriber;
public DelegatedDisposal(Subscriber subscriber, Action<Subscriber> dispose)
{
_subscriber = subscriber;
_dispose = dispose;
}
public void Dispose()
=> _dispose(_subscriber);
}
}
}
@smudge202
Copy link
Author

Until we get this into a repo in a new Gilmond organisation and respective full OSS approval, @shaundodimead has signed off on us taking innovation from the project in the Clean Development Game Engine.

@herecydev
Copy link

GC won't "dispose" your subscription.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment