Last active
July 12, 2016 13:12
-
-
Save mahesh-singh/9214295 to your computer and use it in GitHub Desktop.
Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Message subscriber implementation | |
public class AuditSubscriber : IMessageSubscriber | |
{ | |
public IList<string> SubscribedRouteKeys | |
{ | |
get { return new List<string>() | |
{ | |
"*.inceitive.attested.*" | |
}; | |
} | |
} | |
public async Task<bool> Process(Core.MessageInfo MessageItem) | |
{ | |
//Start new task to process the message | |
bool _ProcessedResult = await Task<bool>.Factory.StartNew(() => MessageProcesser(MessageItem), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); | |
return _ProcessedResult; | |
} | |
protected bool MessageProcesser(MessageInfo MessageItem) | |
{ | |
Thread.Sleep(1000); //Acthual work | |
return true; | |
} | |
abstract protected IList<string> SetSubscribedRoutes(); | |
} | |
public class RabbitMQMessageConsumer : AbstractRabbitMQClient, IMessageConsumer | |
{ | |
//Message consumer method, which will initiate number of tasks based upon the available subscriber. | |
public void Consume(CancellationToken token) | |
{ | |
//Start Rabbit MQ connection | |
StartConnection(_ConnectionFactory.Get()); | |
List<Task> tasks = new List<Task>(); | |
foreach (SubscriberType subscriberType in (SubscriberType[])Enum.GetValues(typeof(SubscriberType))) | |
{ | |
//Start listeing to all queues based upon the number of subscriber type availbale in the system | |
Task task = Task.Factory.StartNew(() => ConsumeMessage(subscriberType, token), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default); | |
tasks.Add(task); | |
} | |
Task.WhenAll(tasks); | |
} | |
//Listen to queue | |
async Task ConsumeMessage(SubscriberType subscriberType, CancellationToken token) | |
{ | |
try | |
{ | |
//Get message subscriber which will process the message | |
IMessageSubscriber _MessageSubscriber = _MessageSubscriberFactory.GetMessageSubscriber(subscriberType); | |
using (IModel _ConsumerChannel = _Connection.CreateModel()) | |
{ | |
_ConsumerChannel.ExchangeDeclare(_ExchangeProperties.Name, _ExchangeProperties.Type, _ExchangeProperties.Durable); | |
string _QueueName = Enum.GetName(typeof(SubscriberType), subscriberType); | |
_ConsumerChannel.QueueDeclare(_QueueName, _QueueProperties.Durable, _QueueProperties.Exclusive, _QueueProperties.AutoDelete, _QueueProperties.Arguments); | |
foreach (string routeKey in _MessageSubscriber.SubscribedRouteKeys) | |
{ | |
_ConsumerChannel.QueueBind(_QueueName, _ExchangeProperties.Name, routeKey); | |
} | |
var consumer = new QueueingBasicConsumer(_ConsumerChannel); | |
_ConsumerChannel.BasicConsume(_QueueName, false, consumer); | |
//Infinite loop to listen the queueu | |
while (true) | |
{ | |
if (token.IsCancellationRequested) | |
{ | |
break; | |
} | |
try | |
{ | |
BasicDeliverEventArgs eventArgs; | |
//Get meesage or time out | |
if (consumer.Queue.Dequeue(1000, out eventArgs)) | |
{ | |
if (eventArgs != null) | |
{ | |
MessageInfo _MessageItem = ByteArrayToMessageInfo(eventArgs.Body); | |
//Message process by async method | |
var messageProcesser = _MessageSubscriber.Process(_MessageItem); | |
//Wait for result | |
bool _MessageProcessed = await messageProcesser; | |
if (_MessageProcessed) | |
{ | |
_ConsumerChannel.BasicAck(eventArgs.DeliveryTag, false); | |
} | |
else | |
{ | |
_ConsumerChannel.BasicNack(eventArgs.DeliveryTag, false, true); | |
} | |
} | |
else | |
{ | |
//connnection is dead | |
} | |
} | |
} | |
catch (EndOfStreamException ex) | |
{ | |
Console.WriteLine(ex.Message); | |
throw; | |
} | |
} | |
} | |
} | |
catch (Exception ex) | |
{ | |
Console.WriteLine(ex.Message); | |
//TODO: Restart the task again | |
throw; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment