Skip to content

Instantly share code, notes, and snippets.

@munr
Created February 28, 2012 20:37
Show Gist options
  • Save munr/1934946 to your computer and use it in GitHub Desktop.
Save munr/1934946 to your computer and use it in GitHub Desktop.
RabbitMQ Example
// RabbitMQReceiver Console Application
using System;
using DataContracts;
using RabbitMQ.Client;
using RabbitMQ.Client.MessagePatterns;
namespace RabbitMQReceiver
{
internal class Program
{
private static void Main(string[] args)
{
Console.WriteLine("Waiting for messages... Press CTRL+C key to exit...");
Console.CancelKeyPress += new ConsoleCancelEventHandler(Console_CancelKeyPress);
const string hostName = "localhost";
const string queueName = "MyQueue";
var connectionFactory = new ConnectionFactory {HostName = hostName};
using (var connection = connectionFactory.CreateConnection())
{
using (var model = connection.CreateModel())
{
var subscription = new Subscription(model, queueName, false);
while (true)
{
var basicDeliveryEventArgs = subscription.Next();
var message = Util.ByteArrayToObject<SampleMessage>(basicDeliveryEventArgs.Body);
if (message != null)
{
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:ss") + " : " + message.Name + ", " + message.Guid + ", Date: " + message.Date + ", Rand: " + message.RandomNumber);
}
else
{
Console.WriteLine("Message decoding failed");
}
subscription.Ack(basicDeliveryEventArgs);
}
}
}
}
static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
Console.WriteLine("Exiting...");
return;
}
}
}
//DataContracts Class Library
using System;
namespace DataContracts
{
[Serializable]
public class SampleMessage
{
public string Name { get; set; }
public DateTime Date { get; set; }
public int RandomNumber { get; set; }
public Guid Guid { get; set; }
}
}
// RabbitMQSender Console Application
using System;
using System.Collections.Generic;
using System.Threading;
using DataContracts;
using RabbitMQ.Client;
namespace RabbitMQSender
{
internal class Program
{
private static void Main(string[] args)
{
const string hostName = "localhost";
const string exchangeName = "MyExchange";
const string queueName = "MyQueue";
var connectionFactory = new ConnectionFactory {HostName = hostName};
Console.WriteLine("Sending messages... Press CTRL+C to exit...");
Console.WriteLine();
Console.WriteLine();
Console.CancelKeyPress += Console_CancelKeyPress;
using (var connection = connectionFactory.CreateConnection())
{
using (IModel model = connection.CreateModel())
{
model.ExchangeDeclare(exchangeName, ExchangeType.Fanout, true);
model.QueueDeclare(queueName, true, false, false, new Dictionary<string, object>());
model.QueueBind(queueName, exchangeName, string.Empty, new Dictionary<string, object>());
var i = 0;
while (true)
{
i++;
if (i == Int32.MaxValue)
return;
var message = new SampleMessage() {Name = "Test" + i, Guid = Guid.NewGuid(), Date = DateTime.Now, RandomNumber = new Random().Next(1, 10000)};
var bytes = Util.ObjectToByteArray(message);
var basicProperties = model.CreateBasicProperties();
model.BasicPublish(exchangeName, "", false, false, basicProperties, bytes);
Console.WriteLine(DateTime.Now.ToString("yyyy-MM-dd HH:ss") + " : Published to queue: " + i + ", Guid: " + message.Guid + ", Random Number: " + message.RandomNumber);
Thread.Sleep(1000);
}
}
}
}
static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
{
Console.WriteLine("Exiting...");
return;
}
}
}
//DataContracts Class Library
using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
namespace DataContracts
{
public static class Util
{
/// <summary>
/// Converts an object to a byte array
/// </summary>
public static byte[] ObjectToByteArray(Object obj)
{
try
{
if (obj == null)
return null;
var formatter = new BinaryFormatter();
using (var ms = new MemoryStream())
{
formatter.Serialize(ms, obj);
return ms.ToArray();
}
}
catch (Exception ex)
{
Console.WriteLine("Error converting object to byte array: " + ex.Message);
return null;
}
}
/// <summary>
/// Converts a byte array back to an object
/// </summary>
/// <typeparam name="T">The type of object the byte array should be deserialized to</typeparam>
/// <param name="bytes">The array of bytes.</param>
/// <returns></returns>
public static T ByteArrayToObject<T>(byte[] bytes) where T:class
{
try
{
using (var ms = new MemoryStream())
{
var formatter = new BinaryFormatter();
ms.Write(bytes, 0, bytes.Length);
ms.Seek(0, SeekOrigin.Begin);
return (T)formatter.Deserialize(ms);
}
}
catch (Exception ex)
{
Console.WriteLine("Error converting byte array to object: " + ex.Message);
return default(T);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment