Last active
October 23, 2015 09:17
-
-
Save smudge202/7c426c5295a5136c5489 to your computer and use it in GitHub Desktop.
Composition based Event bus subscriptions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Composition; | |
using System; | |
namespace Domain | |
{ | |
public static class ApplicationExtensions | |
{ | |
/// <summary> | |
/// This extension will simply fire an <see cref="EventBusStarted"/> event. | |
/// In debug mode, the extension will also <see cref="System.Diagnostics.Debug.Assert">Assert</see> that | |
/// the event bus is able to send and receive. | |
/// </summary> | |
public static void UseEventBus(this Executable app) | |
{ | |
app.OnExecute<EventBus>(bus => | |
{ | |
#if DEBUG | |
var test = new EventBusTest(); | |
bus.Subscribe(test); | |
#endif | |
bus.Publish(new EventBusStarted()); | |
#if DEBUG | |
System.Diagnostics.Debug.Assert(test.ReceivedEvent, "The event bus has not started correctly"); | |
#endif | |
}); | |
} | |
#if DEBUG | |
private class EventBusTest : IObserver<EventBusStarted> | |
{ | |
public bool ReceivedEvent { get; private set; } | |
public void OnNext(EventBusStarted value) | |
=> ReceivedEvent = true; | |
public void OnCompleted() { } | |
public void OnError(Exception error) { } | |
} | |
#endif | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace Domain | |
{ | |
/// <summary> | |
/// General purpose event bus to provide the ability to publish events to | |
/// [n]one to many undetermined subscribers. | |
/// | |
/// Event Buses are added to the composition graph as a singleton, so each | |
/// injected instance can be treated as such. | |
/// </summary> | |
public interface EventBus | |
{ | |
/// <summary> | |
/// Subscriptions are made by Event <see cref="Type"/>. | |
/// </summary> | |
/// <param name="observer"> | |
/// When events are <see cref="Publish{T}(T)">published</see>, the <see cref="IObserver{T}.OnNext(T)">OnNext</see> | |
/// method of your observer will be called by the <see cref="EventBus">Event Bus</see>. | |
/// | |
/// There are Helper methods available in the ObservableHelpers assembly to | |
/// generate an <see cref="IObserver{T}"/> from an <see cref="Action{T}"/> | |
/// </param> | |
/// <returns> | |
/// A subscription. These subscriptions MUST be stored or you risk the Garbage Collection disposing your subscription. | |
/// | |
/// There are Helper methods available in the ObservableHelpers assembly to | |
/// hold the subscription for the lifetime of the Application Domain, to save you creating otherwise redundant private fields. | |
/// </returns> | |
IDisposable Subscribe<T>(IObserver<T> observer); | |
/// <summary> | |
/// Publishes the given <paramref name="event"/> to all active subscriptions | |
/// </summary> | |
void Publish<T>(T @event); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Composition; | |
using FluentAssertions; | |
using Microsoft.VisualStudio.TestTools.UnitTesting; | |
using Moq; | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
namespace Domain.Tests | |
{ | |
[TestClass] | |
public class EventBusTests | |
{ | |
private static Subscribers DefaultSubscribers | |
{ | |
get | |
{ | |
return new Subscribers(); | |
} | |
} | |
private static IEnumerable<Action<Subscribers>> DefaultSubscriptions | |
{ | |
get | |
{ | |
return Enumerable.Empty<Action<Subscribers>>(); | |
} | |
} | |
private static EventBus CreateTarget() | |
{ | |
return new NonPersistentInMemoryEventBus(DefaultSubscribers, DefaultSubscriptions); | |
} | |
[TestMethod] | |
public void WhenSubscribersNotProvidedThenThrowsException() | |
{ | |
Action act = () => new NonPersistentInMemoryEventBus(null, DefaultSubscriptions); | |
act.ShouldThrow<ArgumentNullException>(); | |
} | |
[TestMethod] | |
public void WhenUnrealizedSubscriptionsNotProvidedThenThrowsException() | |
{ | |
Action act = () => new NonPersistentInMemoryEventBus(DefaultSubscribers, null); | |
act.ShouldThrow<ArgumentNullException>(); | |
} | |
[TestMethod] | |
public void WhenPublishingDespiteNoSubscriptionsThenDoesNotThrowException() | |
{ | |
Action act = () => CreateTarget().Publish(new object()); | |
act.ShouldNotThrow<Exception>(); | |
} | |
[TestMethod] | |
public void WhenObserverNotProvidedThenThrowsException() | |
{ | |
Action act = () => CreateTarget().Subscribe<object>(null); | |
act.ShouldThrow<ArgumentNullException>(); | |
} | |
[TestMethod] | |
public void WhenObserverProvidedThenReturnsSubscription() | |
{ | |
using (var subscription = CreateTarget().Subscribe(new Fake.Observer())) | |
subscription.Should().NotBeNull(); | |
} | |
[TestMethod] | |
public void WhenSubscriberExistsForPublishedEventThenSubscriberIsNotified() | |
{ | |
var observer = new Mock<IObserver<Fake.Event>>(); | |
var @event = new Fake.Event(); | |
var target = CreateTarget(); | |
using (var subscription = target.Subscribe(observer.Object)) | |
target.Publish(@event); | |
observer.Verify(m => m.OnNext(@event), Times.Once); | |
} | |
[TestMethod] | |
public void WhenSubscriberExistsForDerivativeOfPublishedEventThenSubscriberIsNotified() | |
{ | |
var observer = new Mock<IObserver<Fake.Event>>(); | |
var @event = new Fake.DerivativeEvent(); | |
var target = CreateTarget(); | |
using (var subscription = target.Subscribe(observer.Object)) | |
target.Publish(@event); | |
observer.Verify(m => m.OnNext(@event), Times.Once); | |
} | |
[TestMethod] | |
public void WhenSubscriptionIsDisposedThenSubscribedIsNotNotifiedOfPublishedEvents() | |
{ | |
var observer = new Mock<IObserver<Fake.Event>>(); | |
var @event = new Fake.Event(); | |
var target = CreateTarget(); | |
using (var subscription = target.Subscribe(observer.Object)) { } | |
target.Publish(@event); | |
observer.Verify(m => m.OnNext(@event), Times.Never); | |
} | |
[TestMethod] | |
public void WhenSubscribersAddedToCompositionThenSubscriberReceivesPublishedEvents() | |
{ | |
var app = new Fake.Application(); | |
IDisposable subscription; | |
app.UseServices(services => | |
{ | |
services.AddEventBus(); | |
subscription = services.AddSubscriber<Fake.Observer, Fake.Event>(); | |
}); | |
app.UseEventBus(); | |
var @event = new Fake.Event(); | |
app.OnExecute<EventBus>(bus => bus.Publish(@event)); | |
Fake.Observer.ReceivedEvent.Should().BeNull(); | |
app.Execute(); | |
Fake.Observer.ReceivedEvent.Should().Be(@event); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Concurrent; | |
using System.Collections.Generic; | |
namespace Domain | |
{ | |
/// <summary> | |
/// This is a thread safe, in-memory only implementation of the <see cref="EventBus"/>. | |
/// | |
/// It is strictly in-memory, which replicates the behaviour of the current system (i.e. if the lights go out, you lose all | |
/// current transactions). I highly recommend upgrading to a reliable infrastructure if dependence on this service grows. | |
/// </summary> | |
internal sealed class NonPersistentInMemoryEventBus : EventBus | |
{ | |
private readonly Subscribers _subscribers; | |
private ConcurrentStack<Action<Subscribers>> _unrealizedSubscriptions; | |
public NonPersistentInMemoryEventBus(Subscribers subscribers, IEnumerable<Action<Subscribers>> unrealizedSubscriptions) | |
{ | |
if (subscribers == null) | |
throw new ArgumentNullException(nameof(subscribers)); | |
if (unrealizedSubscriptions == null) | |
throw new ArgumentNullException(nameof(unrealizedSubscriptions)); | |
_subscribers = subscribers; | |
_unrealizedSubscriptions = new ConcurrentStack<Action<Subscribers>>(unrealizedSubscriptions); | |
} | |
public void Publish<T>(T @event) | |
{ | |
if (!_unrealizedSubscriptions.IsEmpty) | |
RealizeSubscriptions(); | |
foreach (var observer in _subscribers.GetSubscribersFor<T>()) | |
observer.OnNext(@event); | |
} | |
private void RealizeSubscriptions() | |
{ | |
Action<Subscribers> unrealizedSubscription; | |
while (_unrealizedSubscriptions.TryPop(out unrealizedSubscription)) | |
unrealizedSubscription(_subscribers); | |
} | |
public IDisposable Subscribe<T>(IObserver<T> observer) | |
{ | |
if (observer == null) | |
throw new ArgumentNullException(nameof(observer)); | |
return _subscribers.Add(new Subscriber(typeof(T), observer)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.Framework.DependencyInjection; | |
using Microsoft.Framework.DependencyInjection.Extensions; | |
using System; | |
namespace Domain | |
{ | |
// I freely admit that between this class and the nested classes in Subscription... head fuck | |
public static class ServiceExtensions | |
{ | |
/// <summary> | |
/// These services must be added in order to use the EventBus. They only need to be added once, but can safely | |
/// be called multiple times. | |
/// </summary> | |
/// <remarks> | |
/// The default <see cref="NonPersistentInMemoryEventBus"/> implementation can be overriden by adding another | |
/// implementation of <see cref="EventBus"/> before calling this extension, or by adding (not TryAdd) afterwards. | |
/// </remarks> | |
public static IServiceCollection AddEventBus(this IServiceCollection services) | |
{ | |
services.TryAddSingleton<Subscribers, Subscribers>(); | |
services.TryAddSingleton<EventBus, NonPersistentInMemoryEventBus>(); | |
return services; | |
} | |
/// <summary> | |
/// Allows subscriptions to be made as part of the composition graph. Realization and instantiation | |
/// is deferred until the application is executed, which is pretty neat if you ask me... | |
/// </summary> | |
/// <returns> | |
/// A Subscription. Ensure the subscription is held or you will fall | |
/// prey to the Garbage Collector Unsubscribing you seemingly randomly | |
/// | |
/// There are helpers available in the ObservableHelpers assembly to | |
/// hold subscriptions for the lifetime of the Application Domain | |
/// </returns> | |
public static IDisposable AddSubscriber<Subscriber, Event>(this IServiceCollection services) | |
where Subscriber : class, IObserver<Event> | |
{ | |
services.AddTransient<Subscriber, Subscriber>(); | |
return services.GetSubscription<Subscriber, Event>(); | |
} | |
/// <summary> | |
/// Allows subscriptions to be made for a pre-initialised <paramref name="subscriber"/>, whilst preserving | |
/// deferred realization and instantiation of the <see cref="EventBus"/> itself. Also pretty neat... | |
/// </summary> | |
/// <returns> | |
/// A Subscription. Ensure the subscription is held or you will fall | |
/// prey to the Garbage Collector Unsubscribing you seemingly randomly | |
/// | |
/// There are helpers available in the ObservableHelpers assembly to | |
/// hold subscriptions for the lifetime of the Application Domain | |
/// </returns> | |
public static IDisposable AddSubscriber<Event>(this IServiceCollection services, IObserver<Event> subscriber) | |
{ | |
services.AddInstance(subscriber); | |
return services.GetSubscription<IObserver<Event>, Event>(); | |
} | |
private static IDisposable GetSubscription<Subscriber, Event>(this IServiceCollection services) | |
where Subscriber : class, IObserver<Event> | |
{ | |
var subscription = Subscription.For<Event>(); | |
services.AddTransient(provider => GetRealizedSubscription<Subscriber, Event>(provider, subscription)); | |
return subscription; | |
} | |
private static Action<Subscribers> GetRealizedSubscription<Subscriber, Event>(IServiceProvider provider, Subscription.DeferredRealization<Event> subscription) | |
where Subscriber : IObserver<Event> | |
{ | |
return subscribers => | |
{ | |
subscription.Realize(provider.GetRequiredService<Subscriber>()); | |
subscribers.Add(new Domain.Subscriber(typeof(Event), subscription.RealizedService)); | |
}; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace Domain | |
{ | |
internal sealed class Subscriber | |
{ | |
public Subscriber(Type type, object subscriber) | |
{ | |
Type = type; | |
Observer = subscriber; | |
} | |
public Type Type { get; } | |
public object Observer { get; } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading; | |
namespace Domain | |
{ | |
internal sealed class Subscribers | |
{ | |
private readonly List<Subscriber> _subscribers = new List<Subscriber>(); | |
/// <summary> | |
/// Enumeration of <see cref="_subscribers"/> in the <see cref="GetSubscribersFor{T}"/> method | |
/// makes usage of a ConcurrentCollection much more difficult, so I've gone with a simple, manual lock. | |
/// </summary> | |
private readonly ReaderWriterLockSlim _subscriberLock = new ReaderWriterLockSlim(); | |
public IDisposable Add(Subscriber subscriber) | |
{ | |
_subscriberLock.EnterWriteLock(); | |
try | |
{ | |
_subscribers.Add(subscriber); | |
return Subscription.For(subscriber, Remove); | |
} | |
finally | |
{ | |
_subscriberLock.ExitWriteLock(); | |
} | |
} | |
public void Remove(Subscriber subscriber) | |
{ | |
_subscriberLock.EnterWriteLock(); | |
try | |
{ | |
_subscribers.Remove(subscriber); | |
} | |
finally | |
{ | |
_subscriberLock.ExitWriteLock(); | |
} | |
} | |
public IEnumerable<IObserver<T>> GetSubscribersFor<T>() | |
{ | |
var type = typeof(T); | |
var results = new List<IObserver<T>>(); | |
_subscriberLock.EnterReadLock(); | |
try | |
{ | |
foreach (var subscriber in _subscribers) | |
if (subscriber.Type.IsAssignableFrom(type)) | |
results.Add((IObserver<T>)subscriber.Observer); | |
} | |
finally | |
{ | |
_subscriberLock.ExitReadLock(); | |
} | |
return results; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
namespace Domain | |
{ | |
internal class Subscription | |
{ | |
internal static DeferredRealization<Event> For<Event>() | |
{ | |
return new DeferredRealization<Event>(); | |
} | |
internal class DeferredRealization<Event> : IDisposable | |
{ | |
public IDisposable Subscription { get; private set; } | |
public IObserver<Event> RealizedService { get; private set; } | |
public void Realize<Subscriber>(Subscriber subscriber) where Subscriber : IObserver<Event> | |
=> RealizedService = subscriber; | |
public void Subscribe(EventBus bus) | |
=> Subscription = bus.Subscribe(RealizedService); | |
public void Dispose() | |
=> Subscription.Dispose(); | |
} | |
internal static DelegatedDisposal For(Subscriber subscriber, Action<Subscriber> dispose) | |
{ | |
return new DelegatedDisposal(subscriber, dispose); | |
} | |
internal class DelegatedDisposal : IDisposable | |
{ | |
private readonly Action<Subscriber> _dispose; | |
private readonly Subscriber _subscriber; | |
public DelegatedDisposal(Subscriber subscriber, Action<Subscriber> dispose) | |
{ | |
_subscriber = subscriber; | |
_dispose = dispose; | |
} | |
public void Dispose() | |
=> _dispose(_subscriber); | |
} | |
} | |
} |
GC won't "dispose" your subscription.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Until we get this into a repo in a new Gilmond organisation and respective full OSS approval, @shaundodimead has signed off on us taking innovation from the project in the Clean Development Game Engine.