Skip to content

Instantly share code, notes, and snippets.

@meisinger
Created June 3, 2014 04:21
Show Gist options
  • Save meisinger/ba6a122cb068e64efb0f to your computer and use it in GitHub Desktop.
Save meisinger/ba6a122cb068e64efb0f to your computer and use it in GitHub Desktop.
factory class to create a connection to rabbit mq along with a consumer and publisher
using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;
public class MessageQueueFactory : IMessageQueueFactory
{
private readonly ConnectionFactory factory;
private IConnection connection;
public bool IsOpen
{
get { return (connection != null && connection.IsOpen); }
}
public MessageQueueFactory(string host)
{
factory = new ConnectionFactory
{
HostName = host,
UserName = ConnectionFactory.DefaultUser,
Password = ConnectionFactory.DefaultPass,
VirtualHost = ConnectionFactory.DefaultVHost,
Protocol = Protocols.DefaultProtocol,
Port = AmqpTcpEndpoint.UseDefaultPort
};
}
public void Dispose()
{
if (connection != null && connection.IsOpen)
connection.Close();
}
public void OpenConnection()
{
try
{
connection = factory.CreateConnection();
}
catch (BrokerUnreachableException ex)
{
throw new IOException(
string.Format("Unable to open connection to RabbitMQ Host \"{0}\".",
factory.HostName), ex);
}
}
public IMessageQueuePublisher CreatePublisher()
{
if (connection == null || !connection.IsOpen)
throw new IOException("RabbitMQ connection is not open or has been closed.");
var channel = connection.CreateModel();
return new MessageQueuePublisher(channel);
}
public IMessageQueueSubscriber CreateSubscriber()
{
if (connection == null || !connection.IsOpen)
throw new IOException("RabbitMQ connection is not open or has been closed.");
var channel = connection.CreateModel();
return new MessageQueueSubscriber(channel);
}
}
public class MessageQueueSubscriber : IMessageQueueSubscriber
{
private readonly IModel channel;
public MessageQueueSubscriber(IModel channel)
{
this.channel = channel;
}
public void Dispose()
{
if (channel.IsOpen)
channel.Dispose();
}
public void Consume(string queue, Func<IQueueMessage, bool> func, CancellationToken token)
{
Consume(queue, 1, func, token);
}
public void Consume(string queue, ushort prefetchCount, Func<IQueueMessage, bool> func, CancellationToken token)
{
var consumer = new CooperativeConsumer(channel);
if (prefetchCount != 0)
channel.BasicQos(0, prefetchCount, false);
channel.BasicConsume(queue, false, consumer);
while (consumer.IsRunning)
{
if (token.IsCancellationRequested)
break;
try
{
var item = consumer.Queue.Dequeue(token);
if (item == null)
continue;
var handled = func(item);
if (handled)
channel.BasicAck(item.MessageId, false);
else
channel.BasicReject(item.MessageId, false);
}
catch (EndOfStreamException)
{
break;
}
}
}
}
public class MessageQueuePublisher : IMessageQueuePublisher
{
private readonly IModel channel;
public MessageQueuePublisher(IModel channel)
{
this.channel = channel;
}
public void Dispose()
{
if (channel.IsOpen)
channel.Close();
}
public void Publish(string exchangeName, string message, IQueueMessageProperties properties)
{
Publish(exchangeName, string.Empty, message, properties);
}
public void Publish(string exchangeName, string exchangeRoute, string message, IQueueMessageProperties properties)
{
var messageProperties = QueueMessage.ConvertProperties(channel, properties);
var messageBody = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, exchangeRoute, false, false, messageProperties, messageBody);
}
public void Publish<T>(string exchangeName, T message, IQueueMessageProperties properties)
where T : class
{
Publish(exchangeName, string.Empty, message, properties);
}
public void Publish<T>(string exchangeName, string exchangeRoute, T message, IQueueMessageProperties properties)
where T : class
{
var messageProperties = QueueMessage.ConvertProperties(channel, properties);
var messageBody = JsonSerializer.SerializeToBytes(message);
channel.BasicPublish(exchangeName, exchangeRoute, false, false, messageProperties, messageBody);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment