Skip to content

Instantly share code, notes, and snippets.

@prabirshrestha
Created July 11, 2012 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save prabirshrestha/3093080 to your computer and use it in GitHub Desktop.
Save prabirshrestha/3093080 to your computer and use it in GitHub Desktop.
azure queue poller
private async Task BatchPoll()
{
var cts = new CancellationTokenSource();
var task = CloudQueuePoller.Poll(c =>
{
var connectionString = "UseDevelopmentStorage=true";
var storageAccount = CloudStorageAccount.Parse(connectionString);
var queueClient = storageAccount.CreateCloudQueueClient();
var queue = queueClient.GetQueueReference(QueueName);
// you can return different instances of queue depending on your partitioning algorithm.
// the return queue can be random, if you want to have certain priorities to a particular partition
c.GetQueue = () =>
{
var tcs = new TaskCompletionSource<CloudQueue>();
tcs.TrySetResult(queue);
return tcs.Task;
};
// the number of messages to dequeue per single rest call to azure
// azure considers one rest call as a transaction, this helps reduce the azure transaction cost.
c.DequeueMessageBatchSize = 10;
// Number of max parallelism to use when dequeuing the queue.
// the poller can dequeue (DequeueThreadCount * DequeueMessageBatchSize) messages per time.
c.DequeueMaxDegreeOfParallelism = 1;
// used only when using OnMessage
// number of parallelism per dequeue thread
c.MaxDegreeOfParallelismForMessageProcessPerDequeue = 1;
// optional cancellation token, if you want to support cancellation of polling.
c.CancellationToken = cts.Token;
// azure message visibility timeout
c.MessageVisibilityTimeout = TimeSpan.FromSeconds(60);
// number of times to retry before giving up
c.RetryCount = 3;
// callback when an error occurs
// q: queue
// ex: the exception that occurred
// obj: object of type either CloudQueueMessage or IEnumerable<CloudQueueMessage>. (can be null)
// isMaxRetry: flag to indicate whether the retry has reached the max limit
c.OnError = (q, ex, obj, isMaxRetry) =>
{
System.Diagnostics.Debug.WriteLine("isMaxRetry: " + isMaxRetry + " - " + ex.Message);
if (isMaxRetry)
{
// make sure to persist the message(s) (ex: a poision queue/db) if isMaxRetry is true
// otherwise the message will be lost forever
// obj can either be of type IEnumerable<CloudQueueMessage> or CloudQueueMessage or null.
}
var tcs = new TaskCompletionSource<object>();
tcs.TrySetResult(null);
return tcs.Task;
};
// callback to call when messages has been dequeued.
c.OnMessages = (q, messages) =>
{
foreach (var message in messages)
Console.WriteLine(message.AsString);
var tcs = new TaskCompletionSource<object>();
tcs.TrySetResult(null);
return tcs.Task;
};
// Note: If you want to process single message use the singular OnMessage instead.
// set either OnMessage or OnMessages and not both of them.
// You might also want to increase the MessageVisibilityTimeout when processing single messages.
//
// c.OnMessage = (q, message) =>
// {
// Console.WriteLine(message.AsString);
//
// var tcs = new TaskCompletionSource<object>();
// tcs.TrySetResult(null);
// return tcs.Task;
// };
//
});
await CloudQueuePoller.Delay(TimeSpan.FromSeconds(20));
cts.Cancel();
try
{
await task;
}
catch (TaskCanceledException ex)
{
Console.WriteLine("Polling cancelled");
}
}
#define ASYNC_TARGETTING_PACK
namespace QueuePolling
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.StorageClient;
public class CloudQueuePollSettings
{
/// <summary>
/// Gets the queue to process. (can implement partitioning algorithms)
/// </summary>
public Func<Task<CloudQueue>> GetQueue { get; set; }
/// <summary>
/// Gets or sets the task to execute when queue is empty. Put the delay logic here
/// </summary>
/// <remarks>
/// Executing OnQueueShould never throw exception.
/// </remarks>
public Func<CloudQueue, Task> OnQueueEmpty { get; set; }
/// <summary>
/// Gets or sets the batch message processor.
/// </summary>
/// <remarks>
/// If exception is thrown when calling OnMessages, it will regard these queue messages as failed
/// </remarks>
public Func<CloudQueue, IEnumerable<CloudQueueMessage>, Task> OnMessages { get; set; }
/// <summary>
/// Gets or sets the message processor.
/// </summary>
/// <remarks>
/// If exception is thrown when calling OnMessage, it will regard this queue message as failed.
/// </remarks>
public Func<CloudQueue, CloudQueueMessage, Task> OnMessage { get; set; }
/// <summary>
/// Gets or sets the action when an error occurs. (CloudQueue, Exception, CloudQueueMessage/IEnumerable<CloudQueueMessage>, isMaxRetry)
/// </summary>
/// <remarks>
/// This method should never throw exception.
/// When isMaxRetry is set to true, you would want to add it to a poison queue as it will delete the message.
/// </remarks>
public Func<CloudQueue, Exception, object, bool, Task> OnError { get; set; }
/// <summary>
/// Gets or sets the number of threads to use for dequeuing.
/// </summary>
public int DequeueMaxDegreeOfParallelism { get; set; }
/// <summary>
/// Gets or sets the number of messages to dequeue per request.
/// </summary>
public int DequeueMessageBatchSize { get; set; }
/// <summary>
/// Gets or sets the maximum number of threads to use to process messages per dequeue thread.
/// </summary>
/// <remarks>
/// todo: need a better name for this
/// </remarks>
public int MaxDegreeOfParallelismForMessageProcessPerDequeue { get; set; }
/// <summary>
/// Gets or sets the number of times to retry before giving up.
/// </summary>
public int RetryCount { get; set; }
/// <summary>
/// Gets or sets the queue visibility timeout.
/// </summary>
public TimeSpan MessageVisibilityTimeout { get; set; }
/// <summary>
/// Gets or sets the timeout for requests made to azure queues.
/// </summary>
public TimeSpan RequestTimeout { get; set; }
/// <summary>
/// Gets or sets the cancellation token.
/// </summary>
public CancellationToken CancellationToken { get; set; }
public CloudQueuePollSettings()
{
DequeueMaxDegreeOfParallelism = 1;
DequeueMessageBatchSize = 32;
MaxDegreeOfParallelismForMessageProcessPerDequeue = 1;
RetryCount = 3;
CancellationToken = CancellationToken.None;
MessageVisibilityTimeout = TimeSpan.FromSeconds(60);
RequestTimeout = TimeSpan.FromSeconds(15);
OnQueueEmpty = queue => CloudQueuePoller.Delay(TimeSpan.FromSeconds(30));
OnError = (q, ex, obj, isMaxRetry) =>
{
Debug.WriteLine("isMaxRetry: " + isMaxRetry + " - " + ex.Message);
if (isMaxRetry)
{
// make sure to persist message (ex: a poision queue) if isMaxRetry is true
// otherwise the message will be lost forever
// obj can either be of type IEnumerable<CloudQueueMessage> or CloudQueueMessage or null.
}
var tcs = new TaskCompletionSource<object>();
tcs.TrySetResult(null);
return tcs.Task;
};
}
}
public static class CloudQueuePoller
{
public static Task Poll(Action<CloudQueuePollSettings> config)
{
var settings = new CloudQueuePollSettings();
if (config != null)
config(settings);
return Poll(settings);
}
public static Task Delay(TimeSpan delay, CancellationToken cancellationToken = default(CancellationToken))
{
#if ASYNC_TARGETTING_PACK
return TaskEx.Delay(delay, cancellationToken);
#else
return Task.Delay(delay, cancellationToken);
#endif
}
public static Task Poll(CloudQueuePollSettings settings)
{
if (settings == null)
throw new ArgumentNullException("settings");
if (settings.GetQueue == null)
throw new ArgumentNullException("Set GetQueue");
if ((settings.OnMessage == null && settings.OnMessages == null) ||
(settings.OnMessage != null && settings.OnMessages != null))
{
throw new ArgumentException("Set either OnMessage or OnMessages");
}
Parallel.For(0, settings.DequeueMaxDegreeOfParallelism, new ParallelOptions { MaxDegreeOfParallelism = settings.DequeueMaxDegreeOfParallelism }, async _ =>
{
while (true)
{
if (settings.CancellationToken.IsCancellationRequested)
break;
CloudQueue queue = null;
Exception exception = null;
try
{
queue = await settings.GetQueue();
}
catch (Exception ex)
{
exception = ex;
}
if (exception != null)
{
await settings.OnError(queue, exception, null, true);
continue;
}
// GetQueue can call cancel and return null
if (queue == null)
continue;
IEnumerable<CloudQueueMessage> messages = null;
int tempGetMessagesRetryCount = settings.RetryCount;
do
{
exception = null;
try
{
messages = await TimeoutAfter(Task.Factory.FromAsync<int, TimeSpan, IEnumerable<CloudQueueMessage>>(queue.BeginGetMessages, queue.EndGetMessages, settings.DequeueMessageBatchSize, settings.MessageVisibilityTimeout, null), settings.RequestTimeout);
break;
}
catch (Exception ex)
{
exception = ex;
}
--tempGetMessagesRetryCount;
if (exception != null)
await settings.OnError(queue, exception, null, tempGetMessagesRetryCount == 0);
} while (tempGetMessagesRetryCount >= 0);
if (exception != null)
{
continue;
}
if (!messages.Any())
{
await settings.OnQueueEmpty(queue);
continue;
}
if (settings.OnMessages != null)
{
// batch process
try
{
exception = null;
await settings.OnMessages(queue, messages);
await DeleteMessages(settings, queue, messages);
}
catch (Exception ex)
{
exception = ex;
}
if (exception != null)
{
var errorOutMessages = messages.Where(m => m.DequeueCount >= settings.RetryCount);
var nonErrorOutMessages = messages.Where(m => m.DequeueCount < settings.RetryCount);
if (errorOutMessages.Any())
{
await settings.OnError(queue, exception, errorOutMessages, true);
await DeleteMessages(settings, queue, errorOutMessages);
}
if (nonErrorOutMessages.Any())
await settings.OnError(queue, exception, nonErrorOutMessages, false);
}
}
else
{
// single message process
Parallel.ForEach(messages, new ParallelOptions { MaxDegreeOfParallelism = settings.MaxDegreeOfParallelismForMessageProcessPerDequeue }, async message =>
{
Exception singleMessageProcessException; // need a new local exception here
try
{
singleMessageProcessException = null;
await settings.OnMessage(queue, message);
await DeleteMessage(settings, queue, message);
}
catch (Exception ex)
{
singleMessageProcessException = ex;
}
if (singleMessageProcessException != null)
{
if (message.DequeueCount > settings.RetryCount)
{
await settings.OnError(queue, singleMessageProcessException, message, true);
await DeleteMessage(settings, queue, message);
}
else
await settings.OnError(queue, singleMessageProcessException, message, false);
}
});
}
}
});
var tcs = new TaskCompletionSource<object>();
tcs.TrySetCanceled();
return tcs.Task;
}
private async static Task DeleteMessages(CloudQueuePollSettings settings, CloudQueue queue, IEnumerable<CloudQueueMessage> messages)
{
foreach (var message in messages)
{
int retryCount = settings.RetryCount;
do
{
Exception exception = null;
try
{
await TimeoutAfter(Task.Factory.FromAsync(queue.BeginDeleteMessage, queue.EndDeleteMessage, message, null), settings.RequestTimeout);
break;
}
catch (StorageClientException ex)
{
// http://blog.smarx.com/posts/deleting-windows-azure-queue-messages-handling-exceptions
if (ex.ExtendedErrorInformation.ErrorCode == "MessageNotFound")
{
// pop receipt must be invalid
// ignore or log (so we can tune the visibility timeout)
}
else
{
// not the error we were expecting
exception = ex;
}
}
catch (Exception ex)
{
exception = ex;
}
--retryCount;
if (exception != null)
await settings.OnError(queue, exception, message, retryCount == 0);
} while (retryCount >= 0);
}
}
private async static Task DeleteMessage(CloudQueuePollSettings settings, CloudQueue queue, CloudQueueMessage message)
{
int retryCount = settings.RetryCount;
do
{
Exception exception = null;
try
{
await TimeoutAfter(Task.Factory.FromAsync(queue.BeginDeleteMessage, queue.EndDeleteMessage, message, null), settings.RequestTimeout);
}
catch (StorageClientException ex)
{
// http://blog.smarx.com/posts/deleting-windows-azure-queue-messages-handling-exceptions
if (ex.ExtendedErrorInformation.ErrorCode == "MessageNotFound")
{
// pop receipt must be invalid
// ignore or log (so we can tune the visibility timeout)
}
else
{
// not the error we were expecting
exception = ex;
}
}
catch (Exception ex)
{
exception = ex;
}
--retryCount;
if (exception != null)
await settings.OnError(queue, exception, message, retryCount == 0);
} while (retryCount >= 0);
}
private static async Task<T> TimeoutAfter<T>(Task<T> task, TimeSpan dueDate)
{
// http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx
// infinite timeout or task already completed
if (task.IsCompleted || (dueDate.TotalMilliseconds == Timeout.Infinite))
{
// Either the task has already completed or timeout will never occur.
// No proxy necessary.
return await task;
}
// zero timeout
if (dueDate.TotalMilliseconds <= 0)
{
// We've already timed out.
throw new TimeoutException();
}
#if ASYNC_TARGETTING_PACK
if (task == await TaskEx.WhenAny(task, TaskEx.Delay(dueDate)))
#else
if (task == await Task.WhenAny(task, Task.Delay(dueDate)))
#endif
return await task;
else
throw new TimeoutException();
}
private static async Task TimeoutAfter(Task task, TimeSpan dueDate)
{
// http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx)
// infinite timeout or task already completed
if (task.IsCompleted || (dueDate.TotalMilliseconds == Timeout.Infinite))
{
// Either the task has already completed or timeout will never occur.
// No proxy necessary.
await task;
}
// zero timeout
if (dueDate.TotalMilliseconds <= 0)
{
// We've already timed out.
throw new TimeoutException();
}
#if ASYNC_TARGETTING_PACK
if (task == await TaskEx.WhenAny(task, TaskEx.Delay(dueDate)))
#else
if (task == await Task.WhenAny(task, Task.Delay(dueDate)))
#endif
await task;
else
throw new TimeoutException();
}
}
}
@Daniel15
Copy link

Based on the "async" and "await" keywords, I assume this only works on .NET Framework 4.5?

@prabirshrestha
Copy link
Author

You can add this file in a .net 4.5 project and compile it using VS2012 then reference the output dll along with the async targeting pack in .net 4.0 http://blogs.msdn.com/b/csharpfaq/archive/2012/04/26/async-targeting-pack-for-visual-studio-11-now-available-for-net-4-and-silverlight-5.aspx

The code is currently incomplete, check in 2-3 days most likely I will have a complete version with exception handling by then.

@federicoboerr
Copy link

One small improvement...
When creating the Task for reading messages, explicit that the task should be created attached to the parent task, so when the parent task ends, the task reading messages will also end at line 164:

Task.Factory.FromAsync<...>(queue.BeginGetMessages, queue.EndGetMessages,..., null, TaskCreationOptions.AttachedToParent);

@prabirshrestha
Copy link
Author

@federicoboerr Task.Factory.FromAsync now includes TaskCreationOptions.AttachedToParent.

@prabirshrestha
Copy link
Author

@federicoboerr removed TaskCreationOptions.AttachedToParent when calling FromAsync<...>queue.BeginGetMessages so it correctly propagates the exception when we use it with TimeoutAfter rather then crashing the app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment