Skip to content

Instantly share code, notes, and snippets.

@digitalBush
Created March 14, 2013 02:15
Show Gist options
  • Save digitalBush/5158269 to your computer and use it in GitHub Desktop.
Save digitalBush/5158269 to your computer and use it in GitHub Desktop.
SignalR scale out via RabbitMQ and ServiceStack.Text
using System;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Messaging;
using RabbitMQ.Client;
namespace SignalR.RabbitMQ {
public static class DependencyResolverExtensions {
public static IDependencyResolver UseRabbitMq(this IDependencyResolver resolver, ConnectionFactory connectionFactory) {
if (connectionFactory == null) {
throw new ArgumentNullException("connectionFactory");
}
resolver.Register(typeof(IMessageBus), () => new RabbitMQBackplane(connectionFactory,resolver));
return resolver;
}
}
}
using System;
using System.IO;
using RabbitMQ.Client;
using ServiceStack.Text;
namespace SignalR.RabbitMQ {
public class MessageConsumer<T>:DefaultBasicConsumer {
private readonly Action<T> _action;
public MessageConsumer(Action<T> action) {
_action = action;
}
public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,IBasicProperties properties, byte[] body) {
using (var stream = new MemoryStream(body)) {
var obj = JsonSerializer.DeserializeFromStream<T>(stream);
_action(obj);
}
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Messaging;
using RabbitMQ.Client;
using ServiceStack.Text;
using IConnection = RabbitMQ.Client.IConnection;
namespace SignalR.RabbitMQ {
public class RabbitMQBackplane:ScaleoutMessageBus {
private const string Exchange = "signalR";
private readonly IModel _sendChannel;
private readonly IConnection _connection;
private ulong _id = 0; //TODO: Not sure what to do here...
public RabbitMQBackplane(ConnectionFactory connectionFactory, IDependencyResolver resolver) : base(resolver) {
_connection = connectionFactory.CreateConnection();
_sendChannel = _connection.CreateModel();
_sendChannel.ExchangeDeclare(Exchange, ExchangeType.Fanout, true, false, null);
var receiveChannel = _connection.CreateModel();
var queue = receiveChannel.QueueDeclare("signalR-"+Guid.NewGuid(), false, true, true, null);
receiveChannel.QueueBind(queue.QueueName, Exchange, "");
var consumer = new MessageConsumer<IList<Message>>(messages => OnReceived("0", _id++, messages));
receiveChannel.BasicConsume(queue.QueueName, true, consumer);
}
protected override Task Send(IList<Message> messages) {
return Task.Factory.StartNew(msgs => {
using (var stream = new MemoryStream()){
JsonSerializer.SerializeToStream(msgs, stream);
_sendChannel.BasicPublish(Exchange, "", null, stream.ToArray());
}
}, messages);
}
protected override void Dispose(bool disposing) {
if (disposing){
_connection.Abort();
_connection.Dispose();
}
base.Dispose(disposing);
}
}
}
@digitalBush
Copy link
Author

I would love to know what to do with those first two parameters to OnReceived...

@digitalBush
Copy link
Author

I think the biggest challenge is recovering from faulted connections. I'm not really sure of the best way to go about that.

@karlgrz
Copy link

karlgrz commented Mar 14, 2013

Not sure how much this helps, considering may be different environment, but the way we approached it was to abstract the connection creation and try to get a new one when a message is received, similar to this:

internal class RabbitMQConnectionFactory
{
    public RabbitMQConnectionFactory(BusConfig config, ServerConfig server)
    {
        this.ConnectionFactory = new ConnectionFactory();

        if (!string.IsNullOrEmpty(server.VirtualHost))
            this.ConnectionFactory.VirtualHost = server.VirtualHost;

        if (config.HeartbeatSeconds != null)
            this.ConnectionFactory.RequestedHeartbeat = config.HeartbeatSeconds.Value;

        // Increase the timeout for debugging sessions, otherwise the timeouts will drive you mad...
        if (Debugger.IsAttached)
            this.ConnectionFactory.RequestedHeartbeat = 0;

        this.ConnectionFactory.HostName = server.Host;
        this.ConnectionFactory.Port = server.Port;
        this.ConnectionFactory.UserName = server.UserName;
        this.ConnectionFactory.Password = server.Password;
    }

    private ConnectionFactory ConnectionFactory
    {
        get;
        set;
    }

    public IConnection GetConnection()
    {
        return this.ConnectionFactory.CreateConnection();
    }
}

Then we implement our Bus like this:

public class Bus : IBus
{
    public Bus(IBusBuilder builder)
    {
        Contract.ArgumentNotNull("builder", builder);

        this.MessageSender = builder.ToMessageSender();

        this.ConnectionFactory = new RabbitMQConnectionFactory(this.MessageSender.Config, this.MessageSender.Config.Servers[0]);
        this.Subscribe = new SubscriptionFactory(this.MessageSender);
    }

    public SubscriptionFactory Subscribe
    {
        get;
        private set;
    }

    public BusConfig Config
    {
        get { return this.MessageSender.Config; }
    }

    private MessageSender MessageSender
    {
        get;
        set;
    }

    private RabbitMQConnectionFactory ConnectionFactory
    {
        get;
        set;
    }

    ...snip...

    public void Send(IEnumerable<object> messages)
    {
        using (var connection = this.ConnectionFactory.GetConnection())
        using (var model = connection.CreateModel())
            this.MessageSender.Send(model, messages);
    }

    ...snip...
}

When we run our service host, we spawn X threads running a consumer that implements this Bus, and listens to whichever queues / exchanges the consumer is configured to listen to.

If the connection fails, our service host is set up to continuously retry. Fail fast, get out, try again. Might not be the cleanest but it's a hell of a lot more straightforward than trying to do connection gymnastics.

Also, scaling this out is trivial, just need to spin up more threads of our service host, whether that's on the same instance or new instances. Hope that helps, or at least triggers something!

@derekgreer
Copy link

I really wrestled with the .Net RabbitMQ.Client API on monitoring shutdowns. They've got some poor design/buggy behavior with respect to their ConnectionShutdown event. I ended up reestablishing connections on one of their internal SessionShutdown events and I restablish the bus's previous subscriptions and push queued publications upon reconnect.

On the issue of grabbing a new connection with each publish, that's not the recommended approach and is really going to be a bottleneck for nodes issuing many messages. That's what channels are for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment