Skip to content

Instantly share code, notes, and snippets.

@kcuzner
Last active December 27, 2015 01:09
Show Gist options
  • Save kcuzner/7242836 to your computer and use it in GitHub Desktop.
Save kcuzner/7242836 to your computer and use it in GitHub Desktop.
Awaitable basic consumer
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using System.Threading;
namespace Common
{
public class AwaitableBasicConsumer : RabbitMQ.Client.IBasicConsumer
{
ConcurrentQueue<TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>> deliveryTCS = new ConcurrentQueue<TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>>();
ConcurrentQueue<RabbitMQ.Client.Events.BasicDeliverEventArgs> undelivered = new ConcurrentQueue<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
/// <summary>
/// This will be used to control concurrentqueue access. Many people can enqueue at once, but during dequeue, it is imperative that the
/// queue remain the same to avoid a race condition since this now also queues undelivered things
/// </summary>
ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
public void HandleBasicCancel(string consumerTag)
{
this.CancelAllWaiting();
}
public void HandleBasicCancelOk(string consumerTag)
{
this.CancelAllWaiting();
}
public void HandleBasicConsumeOk(string consumerTag)
{
//we do nothing
}
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body)
{
//we want nothing added while we remove. We also block until everybody is done.
rwLock.EnterWriteLock();
try
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body);
bool sent = false;
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
//once we manage to actually set somebody's result, we are done with handling this
if (tcs.TrySetResult(e))
{
sent = true;
break;
}
}
//if nothing was sent, we queue up what we got so that somebody can get it later.
/**
* Without the rwlock, this logic would cause a race condition in the case where after the while block completes without sending, somebody enqueues themselves. They would get the
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are
* doing our thing here.
*/
if (!sent)
{
undelivered.Enqueue(e);
}
}
finally
{
rwLock.ExitWriteLock();
}
}
public void HandleModelShutdown(RabbitMQ.Client.IModel model, RabbitMQ.Client.ShutdownEventArgs reason)
{
if (!this.deliveryTCS.IsEmpty)
{
try
{
//it is the end of the stream
throw new EndOfStreamException();
}
catch (Exception e)
{
this.TerminateAllWaiting(e);
}
}
}
public RabbitMQ.Client.IModel Model { get; private set; }
public AwaitableBasicConsumer(RabbitMQ.Client.IModel model)
{
this.Model = model;
}
/// <summary>
/// Sets all waiting tasks to cancelled
/// </summary>
private void CancelAllWaiting()
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while(deliveryTCS.TryDequeue(out tcs))
{
tcs.TrySetCanceled();
}
}
/// <summary>
/// Sets all waiting tasks to have a
/// </summary>
private void TerminateAllWaiting(Exception e)
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs;
while (deliveryTCS.TryDequeue(out tcs))
{
tcs.TrySetException(e);
}
}
/// <summary>
/// Asynchronous method for waiting for the next deliver
/// </summary>
/// <returns>A task that will be complete the next time something is delivered</returns>
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken)
{
//we are enqueueing. This is a "read"
rwLock.EnterReadLock();
try
{
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>();
//if we are cancelled before we finish, this will cause the tcs to become cancelled
cancellationToken.Register(() =>
{
tcs.TrySetCanceled();
});
if (!TryDeliverUndelivered(tcs))
{
deliveryTCS.Enqueue(tcs);
}
return tcs.Task;
}
finally
{
rwLock.ExitReadLock();
}
}
/// <summary>
/// Attempts to deliver an undelivered thing. WARNING: Does not enter the lock. The calling method should ensure the lock is properly entered.
/// </summary>
/// <param name="tcs"></param>
/// <returns>True if something was delivered to them</returns>
private bool TryDeliverUndelivered(TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs)
{
RabbitMQ.Client.Events.BasicDeliverEventArgs e;
if (undelivered.TryDequeue(out e))
{
//congratulations! you just got syncronously delivered with something we received with no consumer to give it to
tcs.SetResult(e);
return true; //we return true so the caller knows that they had something delivered to them
}
else
{
//there was nothing to give them, so they get to wait
return false;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment