Skip to content

Instantly share code, notes, and snippets.

@jayhilden
Created April 11, 2016 15:01
Show Gist options
  • Save jayhilden/2078872a53c7df0fe45d661861ed2d45 to your computer and use it in GitHub Desktop.
Save jayhilden/2078872a53c7df0fe45d661861ed2d45 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RabbitMQ.Client.Events;
using Verify.Platform.Infrastructure.RabbitMQ.Producers;
namespace Verify.Platform.Infrastructure.RabbitMQ
{
public interface IRabbitUtilityService
{
void AttemptRetry(BasicDeliverEventArgs deliverEventArgs);
int GetCurrentRetryCount(BasicDeliverEventArgs deliverEventArgs);
}
internal class RabbitUtilityService : IRabbitUtilityService
{
private readonly IRabbitMessageBusProducerInternal _messageBusProducer;
public const ushort MaxRetryCount = 10;
public const string HeaderKeyRetryCount = "RetryCount";
public RabbitUtilityService(IRabbitMessageBusProducerInternal messageBusProducer)
{
_messageBusProducer = messageBusProducer;
}
/// <summary>
/// retry count header (10 times max)
/// ++count, ack & resend; nack if past max
/// </summary>
public void AttemptRetry(BasicDeliverEventArgs deliverEventArgs)
{
// Get count of previous retry attempts
var retryCount = GetCurrentRetryCount(deliverEventArgs);
if (retryCount >= MaxRetryCount)
{
Console.WriteLine("Max retry reached");
throw new RabbitRetryLimitReachedException();
}
string originatingQueueName;
if (!TryGetOriginatingQueue(deliverEventArgs, out originatingQueueName))
{
throw new RabbitRetryFailedException();
}
// Retry
retryCount++;
deliverEventArgs.BasicProperties.Headers[HeaderKeyRetryCount] = retryCount;
Console.WriteLine("{0} = {1}", HeaderKeyRetryCount, retryCount);
Console.WriteLine("publishing back into queue {0}", originatingQueueName);
_messageBusProducer.Publish("", originatingQueueName, deliverEventArgs.BasicProperties, deliverEventArgs.Body);
}
public virtual int GetCurrentRetryCount(BasicDeliverEventArgs deliverEventArgs)
{
if (deliverEventArgs?.BasicProperties?.Headers == null || !deliverEventArgs.BasicProperties.Headers.ContainsKey(HeaderKeyRetryCount))
{
return 0;
}
var retryHeader = deliverEventArgs.BasicProperties.Headers[HeaderKeyRetryCount].ToString();
int retryCount;
return int.TryParse(retryHeader, out retryCount)
? retryCount
: 0;
}
#region Helpers
internal virtual string ConvertFromUTF8ToString(byte[] bytes)
{
return bytes == null
? null
: Encoding.UTF8.GetString(bytes);
/*the code below does not work properly for how rabbit encodes the data
var chars = new char[bytes.Length/sizeof (char)];
Buffer.BlockCopy(bytes, 0, chars, 0, bytes.Length);
return new string(chars);
*/
}
internal virtual IEnumerable<T> ParseRabbitHeaderValue<T>(BasicDeliverEventArgs deliverEventArgs, string key)
{
if (!deliverEventArgs.BasicProperties.Headers.ContainsKey(key))
{
throw new ArgumentException($"missing {key} key", nameof(deliverEventArgs));
}
var val = deliverEventArgs.BasicProperties.Headers[key];
var list = val as List<object>;
if (list == null)
{
throw new NotSupportedException($"type {val.GetType()} is not yet supported");
}
return list.OfType<T>();
}
internal virtual bool TryGetOriginatingQueue(BasicDeliverEventArgs deliverEventArgs, out string originatingQueueName)
{
const string headerKeyDeadLetter = "x-death";
const string headerKeyReason = "reason";
var rejectedDeathHeader = ParseRabbitHeaderValue<IDictionary<string, object>>(deliverEventArgs, headerKeyDeadLetter)
.FirstOrDefault(header => ConvertFromUTF8ToString((byte[])header[headerKeyReason]) == "rejected");
if (rejectedDeathHeader == null)
{
originatingQueueName = null;
return false;
}
originatingQueueName = ConvertFromUTF8ToString((byte[])rejectedDeathHeader["queue"]);
return true;
}
#endregion
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment