Skip to content

Instantly share code, notes, and snippets.

@dfch
Last active Aug 29, 2015
Embed
What would you like to do?
Simple RabbitMQ Wrapper for PowerShell
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Util;
// Add reference to Rabbit.MQ.Client.dll
namespace RabbitMQHelper
{
public class Client : IDisposable
{
#region ========== Constants ==========
private const string _VirtualHostDefault = "/";
private const string _ServerDefault = "localhost";
private const int _PortDefault = AmqpTcpEndpoint.UseDefaultPort; //5672;
private const string _UsernameDefault = "guest";
private const string _PasswordDefault = "guest";
private const string _ExchangeNameDefault = "";
private const string _QueueNameDefault = "default";
private const bool _AutoDeleteDefault = false;
private const bool _DurableDefault = true;
private const bool _ConnectionAutoCloseDefault = false;
private const int _WaitMilliSecondsDefault = -1;
#endregion
#region ========== Properties ==========
private bool disposed = false;
string _Server = _ServerDefault;
public string Server
{
get { return _Server; }
set { _Server = value; }
}
int _Port = _PortDefault;
public int Port
{
get { return _Port; }
set { _Port = value; }
}
string _QueueName = _QueueNameDefault;
public string QueueName
{
get { return _QueueName; }
set { _QueueName = value; }
}
private bool _Durable = _DurableDefault;
public bool Durable
{
get { return _Durable; }
set { _Durable = value; }
}
private bool _Exclusive;
public bool Exclusive
{
get { return _Exclusive; }
set { _Exclusive = value; }
}
private bool _AutoDelete = _AutoDeleteDefault;
public bool AutoDelete
{
get { return _AutoDelete; }
set { _AutoDelete = value; }
}
private bool _ConnectionAutoClose = _ConnectionAutoCloseDefault;
public bool ConnectionAutoClose
{
get { return _ConnectionAutoClose; }
set { _ConnectionAutoClose = value; }
}
private string _ExchangeName = _ExchangeNameDefault;
public string ExchangeName
{
get { return _ExchangeName; }
set { _ExchangeName = value; }
}
private QueueingBasicConsumer _Consumer;
public QueueingBasicConsumer Consumer
{
get { return _Consumer; }
set { _Consumer = value; }
}
private IDictionary<string, object> _QueueArguments = new Dictionary<string, object>();
public IDictionary<string, object> QueueArguments
{
get { return _QueueArguments; }
set { _QueueArguments = value; }
}
private QueueDeclareOk _QueueDeclareReturn;
public QueueDeclareOk QueueDeclareReturn
{
get { return _QueueDeclareReturn; }
set { _QueueDeclareReturn = value; }
}
private string _Username = _UsernameDefault;
public string Username
{
get { return _Username; }
set { _Username = value; }
}
private string _Password = _PasswordDefault;
public string Password
{
//get { return _Password; }
set { _Password = value; }
}
private int _WaitMilliSeconds = _WaitMilliSecondsDefault;
public int WaitMilliSeconds
{
get { return _WaitMilliSeconds; }
set { _WaitMilliSeconds = value; }
}
private string _VirtualHost = _VirtualHostDefault;
public string VirtualHost
{
get { return _VirtualHost; }
set { _VirtualHost = value; }
}
private IProtocol _Protocol = Protocols.FromEnvironment();
public IProtocol Protocol
{
get { return _Protocol; }
set { _Protocol = value; }
}
private ConnectionFactory _ConnectionFactory;
public ConnectionFactory ConnectionFactory
{
get { return _ConnectionFactory; }
set { _ConnectionFactory = value; }
}
private IConnection _Connection;
public IConnection Connection
{
get { return _Connection; }
set { _Connection = value; }
}
private IModel _Channel;
public IModel Channel
{
get { return _Channel; }
set { _Channel = value; }
}
#endregion
#region ========== Methods ==========
public IConnection Connect()
{
return Connect(_VirtualHost, _Server, _Port, _Username, _Password, null);
}
public IConnection Connect(string Server, string Username, string Password)
{
return Connect(_VirtualHost, _Server, _Port, Username, Password, null);
}
public IConnection Connect(string VirtualHost, string Server, string Username, string Password)
{
return Connect(VirtualHost, Server, _Port, Username, Password, null);
}
public IConnection Connect(string VirtualHost, string Server, int? Port, string Username, string Password, int? MaxRedirects)
{
if (null == _Connection || !_Connection.IsOpen)
{
if (null == _ConnectionFactory)
{
var __VirtualHost = VirtualHost ?? _VirtualHost;
var __Server = Server ?? _Server;
var __Port = Port ?? _Port;
var __Username = Username ?? _Username;
var __Password = Password ?? _Password;
_ConnectionFactory = new ConnectionFactory()
{
Protocol = _Protocol
,
VirtualHost = __VirtualHost
,
HostName = __Server
,
Port = __Port
,
UserName = __Username
,
Password = __Password
};
}
if (null == MaxRedirects)
{
_Connection = _ConnectionFactory.CreateConnection();
}
else
{
_Connection = _ConnectionFactory.CreateConnection((int) MaxRedirects);
}
}
return _Connection;
}
public void Disconnect()
{
Disconnect(true, true, true, false);
return;
}
public void Disconnect(bool CloseConnection, bool CloseChannel, bool CloseConsumer, bool CloseQueue)
{
if (null != _Consumer)
{
if (CloseQueue)
{
_Consumer.Queue.Close();
}
if (CloseConsumer)
{
_Consumer = null;
}
}
if (CloseChannel)
{
if ((null != _Channel) && _Channel.IsOpen)
{
_Channel.Close();
}
_QueueDeclareReturn = null;
_Channel = null;
}
if (CloseConnection)
{
if (null != _Connection)
{
if (_Connection.IsOpen)
{
_Connection.Close();
}
_Connection = null;
if (null != _ConnectionFactory)
{
_ConnectionFactory = null;
}
}
}
return;
}
public IModel CreateChannel()
{
if (null == _Channel)
{
Connect();
_Channel = _Connection.CreateModel();
_Connection.AutoClose = _ConnectionAutoClose;
}
return _Channel;
}
public QueueDeclareOk CreateQueue(string QueueName)
{
if (null == _QueueDeclareReturn || !_QueueDeclareReturn.QueueName.Equals(QueueName, StringComparison.CurrentCultureIgnoreCase))
{
_QueueName = QueueName;
CreateChannel();
_QueueDeclareReturn = _Channel.QueueDeclare(_QueueName, _Durable, _Exclusive, _AutoDelete, _QueueArguments);
_Channel.BasicQos(0, 1, false);
}
return _QueueDeclareReturn;
}
public string Receive()
{
return Receive(_QueueName, _WaitMilliSeconds);
}
public string Receive(string QueueName)
{
return Receive(QueueName, _WaitMilliSeconds);
}
public string Receive(int? WaitMilliSeconds)
{
return Receive(_QueueName, WaitMilliSeconds);
}
public string Receive(string QueueName, int? WaitMilliSeconds)
{
//var fReturn = false;
var __QueueName = QueueName ?? _QueueName;
var __WaitMilliSeconds = WaitMilliSeconds ?? _WaitMilliSeconds;
string message = null;
try {
CreateQueue(_QueueName);
if (null == _Consumer)
{
_Consumer = new QueueingBasicConsumer(_Channel);
_Channel.BasicConsume(__QueueName, false, _Consumer);
}
BasicDeliverEventArgs ea;
bool fReturn = _Consumer.Queue.Dequeue((int)__WaitMilliSeconds, out ea);
if (fReturn)
{
var body = ea.Body;
message = Encoding.UTF8.GetString(body);
_Channel.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace));
if (null != ex.InnerException)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace));
}
throw;
}
return message;
}
public bool Send(string message)
{
return Send(_QueueName, message);
}
public bool Send(string QueueName, string message)
{
var fReturn = false;
try
{
CreateQueue(_QueueName);
var body = Encoding.UTF8.GetBytes(message);
var properties = _Channel.CreateBasicProperties();
properties.SetPersistent(true);
_Channel.BasicPublish(_ExchangeName, _QueueName, properties, body);
fReturn = true;
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace));
if (null != ex.InnerException)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace));
}
throw;
}
return fReturn;
}
public Client()
{
}
public Client(string Server, string Username, string Password)
{
_Server = Server;
_Username = Username;
_Password = Password;
}
public Client(string VirtualHost, string Server, string Username, string Password)
{
_VirtualHost = VirtualHost;
_Server = Server;
_Username = Username;
_Password = Password;
}
public Client(string VirtualHost, string Server, int Port, string Username, string Password)
{
_VirtualHost = VirtualHost;
_Server = Server;
_Port = Port;
_Username = Username;
_Password = Password;
}
~Client()
{
Dispose(false);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected void Dispose(bool disposing)
{
try
{
if (disposed)
return;
if (disposing)
{
// Free any other managed objects here.
//
}
// Free any unmanaged objects here.
//
if (null != _Channel && _Channel.IsOpen)
{
}
_Channel = null;
if (null != _Connection && _Connection.IsOpen)
{
}
_Connection = null;
if (null != _Consumer)
{
}
_Consumer = null;
Disconnect();
disposed = true;
}
catch (Exception ex)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.HResult, ex.Message, ex.StackTrace));
if (null != ex.InnerException)
{
Debug.WriteLine(string.Format("{0}: {1}\n{2}", ex.InnerException.HResult, ex.InnerException.Message, ex.InnerException.StackTrace));
}
throw;
}
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment