Skip to content

Instantly share code, notes, and snippets.

@kellabyte
Created August 21, 2011 22:56
Show Gist options
  • Save kellabyte/1161301 to your computer and use it in GitHub Desktop.
Save kellabyte/1161301 to your computer and use it in GitHub Desktop.
Making a service bus independent messaging infrastructure
namespace Caliburn.Micro
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
/// <summary>
/// Enables loosely-coupled publication of and subscription to events.
/// </summary>
public interface IEventAggregator
{
/// <summary>
/// Gets or sets the default publication thread marshaller.
/// </summary>
/// <value>
/// The default publication thread marshaller.
/// </value>
Action<System.Action> PublicationThreadMarshaller { get; set; }
/// <summary>
/// Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
/// </summary>
/// <param name = "instance">The instance to subscribe for event publication.</param>
void Subscribe(object instance);
/// <summary>
/// Unsubscribes the instance from all events.
/// </summary>
/// <param name = "instance">The instance to unsubscribe.</param>
void Unsubscribe(object instance);
/// <summary>
/// Publishes a message.
/// </summary>
/// <param name = "message">The message instance.</param>
/// <remarks>
/// Uses the default thread marshaller during publication.
/// </remarks>
void Publish(object message);
/// <summary>
/// Publishes a message.
/// </summary>
/// <param name = "message">The message instance.</param>
/// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
void Publish(object message, Action<System.Action> marshal);
}
/// <summary>
/// Enables loosely-coupled publication of and subscription to events.
/// </summary>
public class EventAggregator : IEventAggregator
{
/// <summary>
/// The default thread marshaller used for publication;
/// </summary>
public static Action<System.Action> DefaultPublicationThreadMarshaller = action => action();
readonly List<Handler> handlers = new List<Handler>();
/// <summary>
/// Initializes a new instance of the <see cref = "EventAggregator" /> class.
/// </summary>
public EventAggregator()
{
PublicationThreadMarshaller = DefaultPublicationThreadMarshaller;
}
/// <summary>
/// Gets or sets the default publication thread marshaller.
/// </summary>
/// <value>
/// The default publication thread marshaller.
/// </value>
public Action<System.Action> PublicationThreadMarshaller { get; set; }
/// <summary>
/// Subscribes an instance to all events declared through implementations of <see cref = "IHandle{T}" />
/// </summary>
/// <param name = "instance">The instance to subscribe for event publication.</param>
public virtual void Subscribe(object instance)
{
lock (handlers)
{
if (handlers.Any(x => x.Matches(instance)))
return;
handlers.Add(new Handler(instance));
}
}
/// <summary>
/// Unsubscribes the instance from all events.
/// </summary>
/// <param name = "instance">The instance to unsubscribe.</param>
public virtual void Unsubscribe(object instance)
{
lock (handlers)
{
var found = handlers.FirstOrDefault(x => x.Matches(instance));
if (found != null)
handlers.Remove(found);
}
}
/// <summary>
/// Publishes a message.
/// </summary>
/// <param name = "message">The message instance.</param>
/// <remarks>
/// Does not marshall the the publication to any special thread by default.
/// </remarks>
public virtual void Publish(object message)
{
Publish(message, PublicationThreadMarshaller);
}
/// <summary>
/// Publishes a message.
/// </summary>
/// <param name = "message">The message instance.</param>
/// <param name = "marshal">Allows the publisher to provide a custom thread marshaller for the message publication.</param>
public virtual void Publish(object message, Action<System.Action> marshal)
{
Handler[] toNotify;
lock (handlers)
toNotify = handlers.ToArray();
marshal(() =>
{
var messageType = message.GetType();
var dead = toNotify
.Where(handler => !handler.Handle(messageType, message))
.ToList();
if (dead.Any())
{
lock (handlers)
{
dead.Apply(x => handlers.Remove(x));
}
}
});
}
protected class Handler
{
readonly WeakReference reference;
readonly Dictionary<Type, MethodInfo> supportedHandlers = new Dictionary<Type, MethodInfo>();
public Handler(object handler)
{
reference = new WeakReference(handler);
var interfaces = handler.GetType().GetInterfaces()
.Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
foreach (var @interface in interfaces)
{
var type = @interface.GetGenericArguments()[0];
var method = @interface.GetMethod("Handle");
supportedHandlers[type] = method;
}
}
public bool Matches(object instance)
{
return reference.Target == instance;
}
public bool Handle(Type messageType, object message)
{
var target = reference.Target;
if (target == null)
return false;
foreach (var pair in supportedHandlers)
{
if (pair.Key.IsAssignableFrom(messageType))
{
pair.Value.Invoke(target, new[] { message });
return true;
}
}
return true;
}
}
}
}
using System;
using MassTransit;
using MassTransit.Events;
namespace Elasticity.Transports
{
public class MassTransitBus : IBus
{
private string queue = string.Empty;
public MassTransitBus(string queue)
{
this.queue = queue;
}
public void Initialize()
{
Bus.Initialize(sbc =>
{
sbc.UseMsmq();
sbc.VerifyMsmqConfiguration();
sbc.UseMulticastSubscriptionClient();
//sbc.ReceiveFrom("msmq://localhost/test_queue");
sbc.ReceiveFrom(queue);
//sbc.Subscribe(subs =>
//{
// subs.Handler<YourMessage>(msg => Console.WriteLine(msg.Text));
//});
});
}
public void Dispose()
{
// TODO: Implement IDisposable pattern.
}
public void Subscribe(Type messageType)
{
// TODO: Implement MassTransit subscribe.
}
public void Unsubscribe(Type messageType)
{
// TODO: Implement MassTransit unsubscribe.
}
public void Publish(object message)
{
// TODO: Implement MassTransit publish.
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
namespace Elasticity.Events
{
public class ServiceBus : EventAggregator
{
private readonly IBus bus = null;
public ServiceBus(IBus bus)
{
this.bus = bus;
}
public override void Publish(object message)
{
base.Publish(message);
bus.Publish(message);
}
public override void Publish(object message, Action<Action> marshal)
{
base.Publish(message, marshal);
bus.Publish(message);
}
public override void Subscribe(object instance)
{
base.Subscribe(instance);
var interfaces = instance.GetType().GetInterfaces()
.Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
foreach (var @interface in interfaces)
{
var messageType = @interface.GetGenericArguments()[0];
bus.Subscribe(messageType);
}
}
public override void Unsubscribe(object instance)
{
base.Unsubscribe(instance);
var interfaces = instance.GetType().GetInterfaces()
.Where(x => typeof(IHandle).IsAssignableFrom(x) && x.IsGenericType);
foreach (var @interface in interfaces)
{
var messageType = @interface.GetGenericArguments()[0];
bus.Unsubscribe(messageType);
}
}
}
}
using System;
using Elasticity;
using Elasticity.Events;
namespace Elasticity.Test
{
public class TestHandler : IHandle<SomeMessage>
{
public Handle(SomeMessage message)
{
Console.WriteLine(message.Text);
}
}
class Program
{
private static MassTransitBus bus = null;
private static TestHandler handler = null;
static void Main(string[] args)
{
bus = new MassTransitBus("msmq://localhost/MassTransitSample2");
bus.Initialize();
TestHandler handler = new TestHandler();
ServiceBus serviceBus = new ServiceBus(bus);
serviceBus .Subscribe(handler);
serviceBus.Publish(new SomeMessage("Hello World!");
Console.WriteLine("Press any key to exit...");
Console.ReadKey();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment