Last active
December 27, 2015 01:09
-
-
Save kcuzner/7242836 to your computer and use it in GitHub Desktop.
Awaitable basic consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Collections.Concurrent; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.IO; | |
using System.Threading; | |
namespace Common | |
{ | |
public class AwaitableBasicConsumer : RabbitMQ.Client.IBasicConsumer | |
{ | |
ConcurrentQueue<TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>> deliveryTCS = new ConcurrentQueue<TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>>(); | |
ConcurrentQueue<RabbitMQ.Client.Events.BasicDeliverEventArgs> undelivered = new ConcurrentQueue<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); | |
/// <summary> | |
/// This will be used to control concurrentqueue access. Many people can enqueue at once, but during dequeue, it is imperative that the | |
/// queue remain the same to avoid a race condition since this now also queues undelivered things | |
/// </summary> | |
ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); | |
public void HandleBasicCancel(string consumerTag) | |
{ | |
this.CancelAllWaiting(); | |
} | |
public void HandleBasicCancelOk(string consumerTag) | |
{ | |
this.CancelAllWaiting(); | |
} | |
public void HandleBasicConsumeOk(string consumerTag) | |
{ | |
//we do nothing | |
} | |
public void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties properties, byte[] body) | |
{ | |
//we want nothing added while we remove. We also block until everybody is done. | |
rwLock.EnterWriteLock(); | |
try | |
{ | |
RabbitMQ.Client.Events.BasicDeliverEventArgs e = new RabbitMQ.Client.Events.BasicDeliverEventArgs(consumerTag, deliveryTag, redelivered, exchange, routingKey, properties, body); | |
bool sent = false; | |
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs; | |
while (deliveryTCS.TryDequeue(out tcs)) | |
{ | |
//once we manage to actually set somebody's result, we are done with handling this | |
if (tcs.TrySetResult(e)) | |
{ | |
sent = true; | |
break; | |
} | |
} | |
//if nothing was sent, we queue up what we got so that somebody can get it later. | |
/** | |
* Without the rwlock, this logic would cause a race condition in the case where after the while block completes without sending, somebody enqueues themselves. They would get the | |
* next message and the person who enqueues after them would get the message received now. Locking prevents that from happening since nobody can add to the queue while we are | |
* doing our thing here. | |
*/ | |
if (!sent) | |
{ | |
undelivered.Enqueue(e); | |
} | |
} | |
finally | |
{ | |
rwLock.ExitWriteLock(); | |
} | |
} | |
public void HandleModelShutdown(RabbitMQ.Client.IModel model, RabbitMQ.Client.ShutdownEventArgs reason) | |
{ | |
if (!this.deliveryTCS.IsEmpty) | |
{ | |
try | |
{ | |
//it is the end of the stream | |
throw new EndOfStreamException(); | |
} | |
catch (Exception e) | |
{ | |
this.TerminateAllWaiting(e); | |
} | |
} | |
} | |
public RabbitMQ.Client.IModel Model { get; private set; } | |
public AwaitableBasicConsumer(RabbitMQ.Client.IModel model) | |
{ | |
this.Model = model; | |
} | |
/// <summary> | |
/// Sets all waiting tasks to cancelled | |
/// </summary> | |
private void CancelAllWaiting() | |
{ | |
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs; | |
while(deliveryTCS.TryDequeue(out tcs)) | |
{ | |
tcs.TrySetCanceled(); | |
} | |
} | |
/// <summary> | |
/// Sets all waiting tasks to have a | |
/// </summary> | |
private void TerminateAllWaiting(Exception e) | |
{ | |
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs; | |
while (deliveryTCS.TryDequeue(out tcs)) | |
{ | |
tcs.TrySetException(e); | |
} | |
} | |
/// <summary> | |
/// Asynchronous method for waiting for the next deliver | |
/// </summary> | |
/// <returns>A task that will be complete the next time something is delivered</returns> | |
public Task<RabbitMQ.Client.Events.BasicDeliverEventArgs> DequeueAsync(CancellationToken cancellationToken) | |
{ | |
//we are enqueueing. This is a "read" | |
rwLock.EnterReadLock(); | |
try | |
{ | |
TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs = new TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs>(); | |
//if we are cancelled before we finish, this will cause the tcs to become cancelled | |
cancellationToken.Register(() => | |
{ | |
tcs.TrySetCanceled(); | |
}); | |
if (!TryDeliverUndelivered(tcs)) | |
{ | |
deliveryTCS.Enqueue(tcs); | |
} | |
return tcs.Task; | |
} | |
finally | |
{ | |
rwLock.ExitReadLock(); | |
} | |
} | |
/// <summary> | |
/// Attempts to deliver an undelivered thing. WARNING: Does not enter the lock. The calling method should ensure the lock is properly entered. | |
/// </summary> | |
/// <param name="tcs"></param> | |
/// <returns>True if something was delivered to them</returns> | |
private bool TryDeliverUndelivered(TaskCompletionSource<RabbitMQ.Client.Events.BasicDeliverEventArgs> tcs) | |
{ | |
RabbitMQ.Client.Events.BasicDeliverEventArgs e; | |
if (undelivered.TryDequeue(out e)) | |
{ | |
//congratulations! you just got syncronously delivered with something we received with no consumer to give it to | |
tcs.SetResult(e); | |
return true; //we return true so the caller knows that they had something delivered to them | |
} | |
else | |
{ | |
//there was nothing to give them, so they get to wait | |
return false; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment