Skip to content

Instantly share code, notes, and snippets.

@mahesh-singh
Last active July 12, 2016 13:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mahesh-singh/9214295 to your computer and use it in GitHub Desktop.
Save mahesh-singh/9214295 to your computer and use it in GitHub Desktop.
Consumer
//Message subscriber implementation
public class AuditSubscriber : IMessageSubscriber
{
public IList<string> SubscribedRouteKeys
{
get { return new List<string>()
{
"*.inceitive.attested.*"
};
}
}
public async Task<bool> Process(Core.MessageInfo MessageItem)
{
//Start new task to process the message
bool _ProcessedResult = await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return _ProcessedResult;
}
protected bool MessageProcesser(MessageInfo MessageItem)
{
Thread.Sleep(1000); //Acthual work
return true;
}
abstract protected IList<string> SetSubscribedRoutes();
}
public class RabbitMQMessageConsumer : AbstractRabbitMQClient, IMessageConsumer
{
//Message consumer method, which will initiate number of tasks based upon the available subscriber.
public void Consume(CancellationToken token)
{
//Start Rabbit MQ connection
StartConnection(_ConnectionFactory.Get());
List<Task> tasks = new List<Task>();
foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType)))
{
//Start listeing to all queues based upon the number of subscriber type availbale in the system
Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
tasks.Add(task);
}
Task.WhenAll(tasks);
}
//Listen to queue
async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token)
{
try
{
//Get message subscriber which will process the message
IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType);
using (IModel _ConsumerChannel = _Connection.CreateModel())
{
_ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable);
string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType);
_ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments);
foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys)
{
_ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey);
}
var consumer = new QueueingBasicConsumer(_ConsumerChannel);
_ConsumerChannel.BasicConsume(_QueueName, false, consumer);
//Infinite loop to listen the queueu
while (true)
{
if (token.IsCancellationRequested)
{
break;
}
try
{
BasicDeliverEventArgs eventArgs;
//Get meesage or time out
if (consumer.Queue.Dequeue(1000, out eventArgs))
{
if (eventArgs != null)
{
MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body);
//Message process by async method
var messageProcesser = _MessageSubscriber.Process(_MessageItem);
//Wait for result
bool _MessageProcessed = await messageProcesser;
if (_MessageProcessed)
{
_ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false);
}
else
{
_ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true);
}
}
else
{
//connnection is dead
}
}
}
catch (EndOfStreamException ex)
{
Console.WriteLine(ex.Message);
throw;
}
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
//TODO: Restart the task again
throw;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment