Skip to content

Instantly share code, notes, and snippets.

@danbarua
Created August 30, 2012 13:21
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save danbarua/3528413 to your computer and use it in GitHub Desktop.
Save danbarua/3528413 to your computer and use it in GitHub Desktop.
RabbitMQ reliable consumer pattern
while (_isRunning)
{
try
{
if (channel == null || consumer == null)
{
try
{
_connection = factory.CreateConnection();
channel = _connection.CreateModel();
consumer = CreateConsumer(channel); //create your subscription and consumer here
}
catch (BrokerUnreachableException ex)
{
Log.Warn(
"Unable to connect to RabbitMq broker on " + _config.HostName
+ ". Retrying in " + backoffRetryTimeout + " seconds.", ex);
//exponentially increase wait timeout until max reached
Thread.Sleep(backoffRetryTimeout*1000);
backoffRetryTimeout += backoffRetryTimeout;
if (backoffRetryTimeout >= MAX_BACKOFF_TIME_SECONDS)
backoffRetryTimeout = MAX_BACKOFF_TIME_SECONDS;
continue;
}
}
if (consumer != null)
{
try
{
var basicDeliverEventArgs = (BasicDeliverEventArgs)(consumer.Queue.Dequeue());
string messageBody = Encoding.UTF8.GetString(basicDeliverEventArgs.Body);
bool handledMessage = false;
var message = JsonSerializer.DeserializeFromString<TMessage>(messageBody);
if (someFunctionThatReturnsBoolAfterHandling(message))
channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false); //acknowledge receipt
}
catch (EndOfStreamException ex)
{
Log.Error("The underlying RabbitMq channel was closed", ex);
throw new OperationInterruptedException(new ShutdownEventArgs(ShutdownInitiator.Application,0,"Connection closed"));
}
catch (Exception ex)
{
Log.Error("Error processing message " + ex);
Thread.Sleep(1000);
}
}
}
catch (OperationInterruptedException ex)
{
#if DEBUG
Debugger.Break();
#endif
Log.Warn("Connection lost", ex);
using (channel)
{
consumer = null;
channel = null;
_connection = null;
}
Thread.Sleep(1000);
}
}
Log.Debug("RabbitMQWorkerFunction finished - disposing of channel");
if (channel != null) channel.Close();
if (_connection != null) _connection.Close();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment