Skip to content

Instantly share code, notes, and snippets.

@meisinger
Last active August 29, 2015 14:02
Show Gist options
  • Save meisinger/7d6f594faf01f7bfc28a to your computer and use it in GitHub Desktop.
Save meisinger/7d6f594faf01f7bfc28a to your computer and use it in GitHub Desktop.
consumer for cooperative queue
using RabbitMQ.Client
public class CooperativeConsumer : IBasicConsumer
{
private readonly IModel channel;
private readonly CooperativeQueue<IQueueMessage> queue;
public IModel Model
{
get { return channel; }
}
public CooperativeQueue<IQueueMessage> Queue
{
get { return queue; }
}
public bool IsRunning { get; private set; }
public string ConsumerTag { get; private set; }
public ShutdownEventArgs ShutdownReason { get; private set; }
public CooperativeConsumer(IModel channel)
: this (channel, new CooperativeQueue<IQueueMessage>())
{
}
public CooperativeConsumer(IModel channel, CooperativeQueue<IQueueMessage> queue)
{
this.channel = channel;
this.queue = queue;
IsRunning = false;
ConsumerTag = null;
ShutdownReason = null;
}
public void HandleBasicCancel(string cosnumerTag)
{
OnCancel();
}
public void HandleBasicCancelOk(string consumerTag)
{
OnCancel();
}
public void HandleBasicConsumeOk(string consumerTag)
{
ConsumerTag = consumerTag;
IsRunning = true;
}
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, IBasicProperties properties, byte[] body)
{
var messageProperties = QueueMessage.ConvertProperties(properties);
var correlationId = "not-set";
var action = "not-set";
var type = "not-set";
if (messageProperties.IsCorrelationIdSet)
correlationId = messageProperties.CorrelationId;
if (messageProperties.IsMessageActionSet)
action = messageProperties.MessageAction;
if (messageProperties.IsMessageTypeSet)
type = messageProperties.MessageType;
var message = new QueueMessage
{
MessageId = deliveryTag,
Action = action,
Type = type,
CorrelationId = correlationId,
ConsumerId = consumerTag,
Exchange = exchange,
ExchangeRoute = routingKey,
Redelivered = redelivered,
Body = body,
Properties = messageProperties
};
queue.Enqueue(message);
}
public void HandleModelShutdown(IModel model, ShutdownEventArgs reason)
{
ShutdownReason = reason;
OnCancel();
}
public void OnCancel()
{
queue.Close();
IsRunning = false;
}
}
public interface IQueueMessage
{
ulong MessageId { get; }
string Action { get; }
string Type { get; }
string CorrelationId { get; }
string ConsumerId { get; }
string Exchange { get; }
string ExchangeRoute { get; }
byte[] Body { get; }
bool Redelivered { get; }
IQueueMessageProperties Properties { get; }
}
public interface IQueueMessageProperties
{
bool IsAppIdSet { get; }
bool IsContentEncodingSet { get; }
bool IsContentTypeSet { get; }
bool IsCorrelationIdSet { get; }
bool IsExpirationSet { get; }
bool IsExternalMessageSet { get; }
bool IsMessageActionSet { get; }
bool IsMessageTypeSet { get; }
bool IsPrioritySet { get; }
bool IsReplyToSet { get; }
bool IsTimestampSet { get; }
bool IsTypeSet { get; }
bool IsUserIdSet { get; }
bool Persistent { get; }
bool HasHeaders { get; }
string AppId { get; }
string ContentEncoding { get; }
string ContentType { get; }
string CorrelationId { get; }
string Expiration { get; }
string ExternalMessageId { get; }
string MessageAction { get; }
string MessageType { get; }
string ReplyTo { get; }
string Type { get; }
string UserId { get; }
long Timestamp { get; }
int Priority { get; }
IEnumerable<KeyValuePair<string, object>> Headers { get; }
void AddHeader(string key, object value);
}
public class QueueMessage : IQueueMessage
{
public ulong MessageId { get; set; }
public string Action { get; set; }
public string Type { get; set; }
public string CorrelationId { get; set; }
public string ConsumerId { get; set; }
public string Exchange { get; set; }
public string ExchangeRoute { get; set; }
public byte[] Body { get; set; }
public bool Redelivered { get; set; }
public IQueueMessageProperties Properties { get; set; }
public static IBasicProperties ConvertProperties(IModel channel, IQueueMessageProperties properties)
{
var item = channel.CreateBasicProperties();
if (properties == null)
return item;
if (properties.IsAppIdSet)
item.AppId = properties.AppId;
if (properties.IsContentEncodingSet)
item.ContentEncoding = properties.ContentEncoding;
if (properties.IsContentTypeSet)
item.ContentType = properties.ContentType;
if (properties.IsCorrelationIdSet)
item.CorrelationId = properties.CorrelationId;
if (properties.IsExpirationSet)
item.Expiration = properties.Expiration;
if (properties.IsExternalMessageSet)
item.MessageId = properties.ExternalMessageId;
if (properties.IsReplyToSet)
item.ReplyTo = properties.ReplyTo;
if (properties.IsTypeSet)
item.Type = properties.Type;
if (properties.IsUserIdSet)
item.UserId = properties.UserId;
if (properties.IsPrioritySet)
item.Priority = (byte)properties.Priority;
if (properties.IsTimestampSet)
item.Timestamp = new AmqpTimestamp(properties.Timestamp);
item.Headers = new Dictionary<string, object>();
item.Headers["x-action"] = properties.MessageAction;
item.Headers["x-type"] = properties.MessageType;
if (properties.HasHeaders)
foreach (var header in properties.Headers)
item.Headers[header.Key] = header.Value;
item.SetPersistent(properties.Persistent);
return item;
}
public static IQueueMessageProperties ConvertProperties(IBasicProperties properties)
{
var item = new QueueMessageProperties
{
AppId = properties.AppId,
ContentEncoding = properties.ContentEncoding,
ContentType = properties.ContentType,
CorrelationId = properties.CorrelationId,
Expiration = properties.Expiration,
ExternalMessageId = properties.MessageId,
ReplyTo = properties.ReplyTo,
Type = properties.Type,
UserId = properties.UserId
};
if (properties.IsDeliveryModePresent())
item.Persistent = (properties.DeliveryMode.Equals(2));
if (properties.IsPriorityPresent())
item.Priority = properties.Priority;
if (properties.IsTimestampPresent())
item.Timestamp = properties.Timestamp.UnixTime;
if (!properties.IsHeadersPresent())
return item;
var collection = properties.Headers as Hashtable;
if (collection == null)
return item;
if (collection["x-action"] != null)
{
var bytes = collection["x-action"] as byte[];
if (bytes != null && bytes.Length != 0)
item.MessageAction = Encoding.UTF8.GetString(bytes);
}
if (collection["x-type"] != null)
{
var bytes = collection["x-type"] as byte[];
if (bytes != null && bytes.Length != 0)
item.MessageType = Encoding.UTF8.GetString(bytes);
}
foreach (string key in collection.Keys)
{
if (key.Equals("x-action", StringComparison.Ordinal))
continue;
if (key.Equals("x-type", StringComparison.Ordinal))
continue;
item.AddHeader(key, collection[key]);
}
return item;
}
}
public class QueueMessageProperties : IQueueMessageProperties
{
private readonly Dictionary<string, object> headers;
private string appId;
private string contentEncoding;
private string contentType;
private string correlationId;
private string expiration;
private string externalMessageId;
private string messageAction;
private string messageType;
private string replyTo;
private string type;
private string userId;
private long? timestamp;
private int? priority;
public bool HasHeaders
{
get { return (headers.Count != 0); }
}
public IEnumerable<KeyValuePair<string, object>> Headers
{
get { return headers; }
}
public bool IsAppIdSet { get; private set; }
public bool IsContentEncodingSet { get; private set; }
public bool IsContentTypeSet { get; private set; }
public bool IsCorrelationIdSet { get; private set; }
public bool IsExpirationSet { get; private set; }
public bool IsExternalMessageSet { get; private set; }
public bool IsMessageActionSet { get; private set; }
public bool IsMessageTypeSet { get; private set; }
public bool IsReplyToSet { get; private set; }
public bool IsTypeSet { get; private set; }
public bool IsUserIdSet { get; private set; }
public bool Persistent { get; set; }
public bool IsPrioritySet
{
get { return (priority.HasValue); }
}
public bool IsTimestampSet
{
get { return (timestamp.HasValue); }
}
public string AppId
{
get { return appId; }
set {
appId = value;
IsAppIdSet = !(string.IsNullOrEmpty(value));
}
}
public string ContentEncoding
{
get { return contentEncoding; }
set {
contentEncoding = value;
IsContentEncodingSet = !(string.IsNullOrEmpty(value));
}
}
public string ContentType
{
get { return contentType; }
set {
contentType = value;
IsContentTypeSet = !(string.IsNullOrEmpty(value));
}
}
public string CorrelationId
{
get { return correlationId; }
set {
correlationId = value;
IsCorrelationIdSet = !(string.IsNullOrEmpty(value));
}
}
public string Expiration
{
get { return expiration; }
set {
expiration = value;
IsExpirationSet = !(string.IsNullOrEmpty(value));
}
}
public string ExternalMessageId
{
get { return externalMessageId; }
set {
externalMessageId = value;
IsExternalMessageSet = !(string.IsNullOrEmpty(value));
}
}
public string MessageAction
{
get { return messageAction; }
set {
messageAction = value;
IsMessageActionSet = !(string.IsNullOrEmpty(value));
}
}
public string MessageType
{
get { return messageType; }
set {
messageType = value;
IsMessageTypeSet = !(string.IsNullOrEmpty(value));
}
}
public string ReplyTo
{
get { return replyTo; }
set {
replyTo = value;
IsReplyToSet = !(string.IsNullOrEmpty(value));
}
}
public string Type
{
get { return type; }
set {
type = value;
IsTypeSet = !(string.IsNullOrEmpty(value));
}
}
public string UserId
{
get { return userId; }
set {
userId = value;
IsUserIdSet = !(string.IsNullOrEmpty(value));
}
}
public int Priority
{
get { return priority ?? 0; }
set { priority = value; }
}
public long Timestamp
{
get { return timestamp ?? 0; }
set { timestamp = value; }
}
public QueueMessageProperties()
{
headers = new Dictionary<string, object>();
}
public void AddHeader(string key, object value)
{
if (string.IsNullOrEmpty(key))
return;
if (value == null)
return;
headers[key] = value;
}
public static QueueMessageProperties CreatePersistent(string type, string action)
{
var correlationId = Guid.NewGuid().ToString("N");
return CreatePersistent(correlationId, type, action);
}
public static QueueMessageProperties CreatePersistent(string correlationId, string type, string action)
{
return new QueueMessageProperties
{
CorrelationId = correlationId,
MessageType = type,
MessageAction = action,
Persistent = true,
};
}
public static QueueMessageProperties CreateTransient(string type, string action)
{
var correlationId = Guid.NewGuid().ToString("N");
return CreateTransient(correlationId, type, action);
}
public static QueueMessageProperties CreateTransient(string correlationId, string type, string action)
{
return new QueueMessageProperties
{
CorrelationId = correlationId,
MessageType = type,
MessageAction = action,
Persistent = true,
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment