Created
August 21, 2011 22:56
-
-
Save kellabyte/1161301 to your computer and use it in GitHub Desktop.
Making a service bus independent messaging infrastructure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Caliburn.Micro | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
/// <summary> | |
/// Enables loosely-coupled publication of and subscription to events. | |
/// </summary> | |
public interface IEventAggregator | |
{ | |
/// <summary> | |
/// Gets or sets the default publication thread marshaller. | |
/// </summary> | |
/// <value> | |
/// The default publication thread marshaller. | |
/// </value> | |
Action<System.Action> PublicationThreadMarshaller { get; set; } | |
/// <summary> | |
/// Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" /> | |
/// </summary> | |
/// <param name = "instance">The instance to subscribe for event publication.</param> | |
void Subscribe(object instance); | |
/// <summary> | |
/// Unsubscribes the instance from all events. | |
/// </summary> | |
/// <param name = "instance">The instance to unsubscribe.</param> | |
void Unsubscribe(object instance); | |
/// <summary> | |
/// Publishes a message. | |
/// </summary> | |
/// <param name = "message">The message instance.</param> | |
/// <remarks> | |
/// Uses the default thread marshaller during publication. | |
/// </remarks> | |
void Publish(object message); | |
/// <summary> | |
/// Publishes a message. | |
/// </summary> | |
/// <param name = "message">The message instance.</param> | |
/// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param> | |
void Publish(object message, Action<System.Action> marshal); | |
} | |
/// <summary> | |
/// Enables loosely-coupled publication of and subscription to events. | |
/// </summary> | |
public class EventAggregator : IEventAggregator | |
{ | |
/// <summary> | |
/// The default thread marshaller used for publication; | |
/// </summary> | |
public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action(); | |
readonly List<Handler> handlers = new List<Handler>(); | |
/// <summary> | |
/// Initializes a new instance of the <see cref = "EventAggregator" /> class. | |
/// </summary> | |
public EventAggregator() | |
{ | |
PublicationThreadMarshaller = DefaultPublicationThreadMarshaller; | |
} | |
/// <summary> | |
/// Gets or sets the default publication thread marshaller. | |
/// </summary> | |
/// <value> | |
/// The default publication thread marshaller. | |
/// </value> | |
public Action<System.Action> PublicationThreadMarshaller { get; set; } | |
/// <summary> | |
/// Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" /> | |
/// </summary> | |
/// <param name = "instance">The instance to subscribe for event publication.</param> | |
public virtual void Subscribe(object instance) | |
{ | |
lock (handlers) | |
{ | |
if (handlers.Any(x => x.Matches(instance))) | |
return; | |
handlers.Add(new Handler(instance)); | |
} | |
} | |
/// <summary> | |
/// Unsubscribes the instance from all events. | |
/// </summary> | |
/// <param name = "instance">The instance to unsubscribe.</param> | |
public virtual void Unsubscribe(object instance) | |
{ | |
lock (handlers) | |
{ | |
var found = handlers.FirstOrDefault(x => x.Matches(instance)); | |
if (found != null) | |
handlers.Remove(found); | |
} | |
} | |
/// <summary> | |
/// Publishes a message. | |
/// </summary> | |
/// <param name = "message">The message instance.</param> | |
/// <remarks> | |
/// Does not marshall the the publication to any special thread by default. | |
/// </remarks> | |
public virtual void Publish(object message) | |
{ | |
Publish(message, PublicationThreadMarshaller); | |
} | |
/// <summary> | |
/// Publishes a message. | |
/// </summary> | |
/// <param name = "message">The message instance.</param> | |
/// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param> | |
public virtual void Publish(object message, Action<System.Action> marshal) | |
{ | |
Handler[] toNotify; | |
lock (handlers) | |
toNotify = handlers.ToArray(); | |
marshal(() => | |
{ | |
var messageType = message.GetType(); | |
var dead = toNotify | |
.Where(handler => !handler.Handle(messageType, message)) | |
.ToList(); | |
if (dead.Any()) | |
{ | |
lock (handlers) | |
{ | |
dead.Apply(x => handlers.Remove(x)); | |
} | |
} | |
}); | |
} | |
protected class Handler | |
{ | |
readonly WeakReference reference; | |
readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>(); | |
public Handler(object handler) | |
{ | |
reference = new WeakReference(handler); | |
var interfaces = handler.GetType().GetInterfaces() | |
.Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType); | |
foreach (var @interface in interfaces) | |
{ | |
var type = @interface.GetGenericArguments()[0]; | |
var method = @interface.GetMethod("Handle"); | |
supportedHandlers[type] = method; | |
} | |
} | |
public bool Matches(object instance) | |
{ | |
return reference.Target == instance; | |
} | |
public bool Handle(Type messageType, object message) | |
{ | |
var target = reference.Target; | |
if (target == null) | |
return false; | |
foreach (var pair in supportedHandlers) | |
{ | |
if (pair.Key.IsAssignableFrom(messageType)) | |
{ | |
pair.Value.Invoke(target, new[] { message }); | |
return true; | |
} | |
} | |
return true; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using MassTransit; | |
using MassTransit.Events; | |
namespace Elasticity.Transports | |
{ | |
public class MassTransitBus : IBus | |
{ | |
private string queue = string.Empty; | |
public MassTransitBus(string queue) | |
{ | |
this.queue = queue; | |
} | |
public void Initialize() | |
{ | |
Bus.Initialize(sbc => | |
{ | |
sbc.UseMsmq(); | |
sbc.VerifyMsmqConfiguration(); | |
sbc.UseMulticastSubscriptionClient(); | |
//sbc.ReceiveFrom("msmq://localhost/test_queue"); | |
sbc.ReceiveFrom(queue); | |
//sbc.Subscribe(subs => | |
//{ | |
// subs.Handler<YourMessage>(msg => Console.WriteLine(msg.Text)); | |
//}); | |
}); | |
} | |
public void Dispose() | |
{ | |
// TODO: Implement IDisposable pattern. | |
} | |
public void Subscribe(Type messageType) | |
{ | |
// TODO: Implement MassTransit subscribe. | |
} | |
public void Unsubscribe(Type messageType) | |
{ | |
// TODO: Implement MassTransit unsubscribe. | |
} | |
public void Publish(object message) | |
{ | |
// TODO: Implement MassTransit publish. | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Reflection; | |
namespace Elasticity.Events | |
{ | |
public class ServiceBus : EventAggregator | |
{ | |
private readonly IBus bus = null; | |
public ServiceBus(IBus bus) | |
{ | |
this.bus = bus; | |
} | |
public override void Publish(object message) | |
{ | |
base.Publish(message); | |
bus.Publish(message); | |
} | |
public override void Publish(object message, Action<Action> marshal) | |
{ | |
base.Publish(message, marshal); | |
bus.Publish(message); | |
} | |
public override void Subscribe(object instance) | |
{ | |
base.Subscribe(instance); | |
var interfaces = instance.GetType().GetInterfaces() | |
.Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType); | |
foreach (var @interface in interfaces) | |
{ | |
var messageType = @interface.GetGenericArguments()[0]; | |
bus.Subscribe(messageType); | |
} | |
} | |
public override void Unsubscribe(object instance) | |
{ | |
base.Unsubscribe(instance); | |
var interfaces = instance.GetType().GetInterfaces() | |
.Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType); | |
foreach (var @interface in interfaces) | |
{ | |
var messageType = @interface.GetGenericArguments()[0]; | |
bus.Unsubscribe(messageType); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using Elasticity; | |
using Elasticity.Events; | |
namespace Elasticity.Test | |
{ | |
public class TestHandler : IHandle<SomeMessage> | |
{ | |
public Handle(SomeMessage message) | |
{ | |
Console.WriteLine(message.Text); | |
} | |
} | |
class Program | |
{ | |
private static MassTransitBus bus = null; | |
private static TestHandler handler = null; | |
static void Main(string[] args) | |
{ | |
bus = new MassTransitBus("msmq://localhost/MassTransitSample2"); | |
bus.Initialize(); | |
TestHandler handler = new TestHandler(); | |
ServiceBus serviceBus = new ServiceBus(bus); | |
serviceBus .Subscribe(handler); | |
serviceBus.Publish(new SomeMessage("Hello World!"); | |
Console.WriteLine("Press any key to exit..."); | |
Console.ReadKey(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment