-
-
Save prabirshrestha/3093080 to your computer and use it in GitHub Desktop.
private async Task BatchPoll() | |
{ | |
var cts = new CancellationTokenSource(); | |
var task = CloudQueuePoller.Poll(c => | |
{ | |
var connectionString = "UseDevelopmentStorage=true"; | |
var storageAccount = CloudStorageAccount.Parse(connectionString); | |
var queueClient = storageAccount.CreateCloudQueueClient(); | |
var queue = queueClient.GetQueueReference(QueueName); | |
// you can return different instances of queue depending on your partitioning algorithm. | |
// the return queue can be random, if you want to have certain priorities to a particular partition | |
c.GetQueue = () => | |
{ | |
var tcs = new TaskCompletionSource<CloudQueue>(); | |
tcs.TrySetResult(queue); | |
return tcs.Task; | |
}; | |
// the number of messages to dequeue per single rest call to azure | |
// azure considers one rest call as a transaction, this helps reduce the azure transaction cost. | |
c.DequeueMessageBatchSize = 10; | |
// Number of max parallelism to use when dequeuing the queue. | |
// the poller can dequeue (DequeueThreadCount * DequeueMessageBatchSize) messages per time. | |
c.DequeueMaxDegreeOfParallelism = 1; | |
// used only when using OnMessage | |
// number of parallelism per dequeue thread | |
c.MaxDegreeOfParallelismForMessageProcessPerDequeue = 1; | |
// optional cancellation token, if you want to support cancellation of polling. | |
c.CancellationToken = cts.Token; | |
// azure message visibility timeout | |
c.MessageVisibilityTimeout = TimeSpan.FromSeconds(60); | |
// number of times to retry before giving up | |
c.RetryCount = 3; | |
// callback when an error occurs | |
// q: queue | |
// ex: the exception that occurred | |
// obj: object of type either CloudQueueMessage or IEnumerable<CloudQueueMessage>. (can be null) | |
// isMaxRetry: flag to indicate whether the retry has reached the max limit | |
c.OnError = (q, ex, obj, isMaxRetry) => | |
{ | |
System.Diagnostics.Debug.WriteLine("isMaxRetry: " + isMaxRetry + " - " + ex.Message); | |
if (isMaxRetry) | |
{ | |
// make sure to persist the message(s) (ex: a poision queue/db) if isMaxRetry is true | |
// otherwise the message will be lost forever | |
// obj can either be of type IEnumerable<CloudQueueMessage> or CloudQueueMessage or null. | |
} | |
var tcs = new TaskCompletionSource<object>(); | |
tcs.TrySetResult(null); | |
return tcs.Task; | |
}; | |
// callback to call when messages has been dequeued. | |
c.OnMessages = (q, messages) => | |
{ | |
foreach (var message in messages) | |
Console.WriteLine(message.AsString); | |
var tcs = new TaskCompletionSource<object>(); | |
tcs.TrySetResult(null); | |
return tcs.Task; | |
}; | |
// Note: If you want to process single message use the singular OnMessage instead. | |
// set either OnMessage or OnMessages and not both of them. | |
// You might also want to increase the MessageVisibilityTimeout when processing single messages. | |
// | |
// c.OnMessage = (q, message) => | |
// { | |
// Console.WriteLine(message.AsString); | |
// | |
// var tcs = new TaskCompletionSource<object>(); | |
// tcs.TrySetResult(null); | |
// return tcs.Task; | |
// }; | |
// | |
}); | |
await CloudQueuePoller.Delay(TimeSpan.FromSeconds(20)); | |
cts.Cancel(); | |
try | |
{ | |
await task; | |
} | |
catch (TaskCanceledException ex) | |
{ | |
Console.WriteLine("Polling cancelled"); | |
} | |
} |
#define ASYNC_TARGETTING_PACK | |
namespace QueuePolling | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Diagnostics; | |
using System.Linq; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Microsoft.WindowsAzure.StorageClient; | |
public class CloudQueuePollSettings | |
{ | |
/// <summary> | |
/// Gets the queue to process. (can implement partitioning algorithms) | |
/// </summary> | |
public Func<Task<CloudQueue>> GetQueue { get; set; } | |
/// <summary> | |
/// Gets or sets the task to execute when queue is empty. Put the delay logic here | |
/// </summary> | |
/// <remarks> | |
/// Executing OnQueueShould never throw exception. | |
/// </remarks> | |
public Func<CloudQueue, Task> OnQueueEmpty { get; set; } | |
/// <summary> | |
/// Gets or sets the batch message processor. | |
/// </summary> | |
/// <remarks> | |
/// If exception is thrown when calling OnMessages, it will regard these queue messages as failed | |
/// </remarks> | |
public Func<CloudQueue, IEnumerable<CloudQueueMessage>, Task> OnMessages { get; set; } | |
/// <summary> | |
/// Gets or sets the message processor. | |
/// </summary> | |
/// <remarks> | |
/// If exception is thrown when calling OnMessage, it will regard this queue message as failed. | |
/// </remarks> | |
public Func<CloudQueue, CloudQueueMessage, Task> OnMessage { get; set; } | |
/// <summary> | |
/// Gets or sets the action when an error occurs. (CloudQueue, Exception, CloudQueueMessage/IEnumerable<CloudQueueMessage>, isMaxRetry) | |
/// </summary> | |
/// <remarks> | |
/// This method should never throw exception. | |
/// When isMaxRetry is set to true, you would want to add it to a poison queue as it will delete the message. | |
/// </remarks> | |
public Func<CloudQueue, Exception, object, bool, Task> OnError { get; set; } | |
/// <summary> | |
/// Gets or sets the number of threads to use for dequeuing. | |
/// </summary> | |
public int DequeueMaxDegreeOfParallelism { get; set; } | |
/// <summary> | |
/// Gets or sets the number of messages to dequeue per request. | |
/// </summary> | |
public int DequeueMessageBatchSize { get; set; } | |
/// <summary> | |
/// Gets or sets the maximum number of threads to use to process messages per dequeue thread. | |
/// </summary> | |
/// <remarks> | |
/// todo: need a better name for this | |
/// </remarks> | |
public int MaxDegreeOfParallelismForMessageProcessPerDequeue { get; set; } | |
/// <summary> | |
/// Gets or sets the number of times to retry before giving up. | |
/// </summary> | |
public int RetryCount { get; set; } | |
/// <summary> | |
/// Gets or sets the queue visibility timeout. | |
/// </summary> | |
public TimeSpan MessageVisibilityTimeout { get; set; } | |
/// <summary> | |
/// Gets or sets the timeout for requests made to azure queues. | |
/// </summary> | |
public TimeSpan RequestTimeout { get; set; } | |
/// <summary> | |
/// Gets or sets the cancellation token. | |
/// </summary> | |
public CancellationToken CancellationToken { get; set; } | |
public CloudQueuePollSettings() | |
{ | |
DequeueMaxDegreeOfParallelism = 1; | |
DequeueMessageBatchSize = 32; | |
MaxDegreeOfParallelismForMessageProcessPerDequeue = 1; | |
RetryCount = 3; | |
CancellationToken = CancellationToken.None; | |
MessageVisibilityTimeout = TimeSpan.FromSeconds(60); | |
RequestTimeout = TimeSpan.FromSeconds(15); | |
OnQueueEmpty = queue => CloudQueuePoller.Delay(TimeSpan.FromSeconds(30)); | |
OnError = (q, ex, obj, isMaxRetry) => | |
{ | |
Debug.WriteLine("isMaxRetry: " + isMaxRetry + " - " + ex.Message); | |
if (isMaxRetry) | |
{ | |
// make sure to persist message (ex: a poision queue) if isMaxRetry is true | |
// otherwise the message will be lost forever | |
// obj can either be of type IEnumerable<CloudQueueMessage> or CloudQueueMessage or null. | |
} | |
var tcs = new TaskCompletionSource<object>(); | |
tcs.TrySetResult(null); | |
return tcs.Task; | |
}; | |
} | |
} | |
public static class CloudQueuePoller | |
{ | |
public static Task Poll(Action<CloudQueuePollSettings> config) | |
{ | |
var settings = new CloudQueuePollSettings(); | |
if (config != null) | |
config(settings); | |
return Poll(settings); | |
} | |
public static Task Delay(TimeSpan delay, CancellationToken cancellationToken = default(CancellationToken)) | |
{ | |
#if ASYNC_TARGETTING_PACK | |
return TaskEx.Delay(delay, cancellationToken); | |
#else | |
return Task.Delay(delay, cancellationToken); | |
#endif | |
} | |
public static Task Poll(CloudQueuePollSettings settings) | |
{ | |
if (settings == null) | |
throw new ArgumentNullException("settings"); | |
if (settings.GetQueue == null) | |
throw new ArgumentNullException("Set GetQueue"); | |
if ((settings.OnMessage == null && settings.OnMessages == null) || | |
(settings.OnMessage != null && settings.OnMessages != null)) | |
{ | |
throw new ArgumentException("Set either OnMessage or OnMessages"); | |
} | |
Parallel.For(0, settings.DequeueMaxDegreeOfParallelism, new ParallelOptions { MaxDegreeOfParallelism = settings.DequeueMaxDegreeOfParallelism }, async _ => | |
{ | |
while (true) | |
{ | |
if (settings.CancellationToken.IsCancellationRequested) | |
break; | |
CloudQueue queue = null; | |
Exception exception = null; | |
try | |
{ | |
queue = await settings.GetQueue(); | |
} | |
catch (Exception ex) | |
{ | |
exception = ex; | |
} | |
if (exception != null) | |
{ | |
await settings.OnError(queue, exception, null, true); | |
continue; | |
} | |
// GetQueue can call cancel and return null | |
if (queue == null) | |
continue; | |
IEnumerable<CloudQueueMessage> messages = null; | |
int tempGetMessagesRetryCount = settings.RetryCount; | |
do | |
{ | |
exception = null; | |
try | |
{ | |
messages = await TimeoutAfter(Task.Factory.FromAsync<int, TimeSpan, IEnumerable<CloudQueueMessage>>(queue.BeginGetMessages, queue.EndGetMessages, settings.DequeueMessageBatchSize, settings.MessageVisibilityTimeout, null), settings.RequestTimeout); | |
break; | |
} | |
catch (Exception ex) | |
{ | |
exception = ex; | |
} | |
--tempGetMessagesRetryCount; | |
if (exception != null) | |
await settings.OnError(queue, exception, null, tempGetMessagesRetryCount == 0); | |
} while (tempGetMessagesRetryCount >= 0); | |
if (exception != null) | |
{ | |
continue; | |
} | |
if (!messages.Any()) | |
{ | |
await settings.OnQueueEmpty(queue); | |
continue; | |
} | |
if (settings.OnMessages != null) | |
{ | |
// batch process | |
try | |
{ | |
exception = null; | |
await settings.OnMessages(queue, messages); | |
await DeleteMessages(settings, queue, messages); | |
} | |
catch (Exception ex) | |
{ | |
exception = ex; | |
} | |
if (exception != null) | |
{ | |
var errorOutMessages = messages.Where(m => m.DequeueCount >= settings.RetryCount); | |
var nonErrorOutMessages = messages.Where(m => m.DequeueCount < settings.RetryCount); | |
if (errorOutMessages.Any()) | |
{ | |
await settings.OnError(queue, exception, errorOutMessages, true); | |
await DeleteMessages(settings, queue, errorOutMessages); | |
} | |
if (nonErrorOutMessages.Any()) | |
await settings.OnError(queue, exception, nonErrorOutMessages, false); | |
} | |
} | |
else | |
{ | |
// single message process | |
Parallel.ForEach(messages, new ParallelOptions { MaxDegreeOfParallelism = settings.MaxDegreeOfParallelismForMessageProcessPerDequeue }, async message => | |
{ | |
Exception singleMessageProcessException; // need a new local exception here | |
try | |
{ | |
singleMessageProcessException = null; | |
await settings.OnMessage(queue, message); | |
await DeleteMessage(settings, queue, message); | |
} | |
catch (Exception ex) | |
{ | |
singleMessageProcessException = ex; | |
} | |
if (singleMessageProcessException != null) | |
{ | |
if (message.DequeueCount > settings.RetryCount) | |
{ | |
await settings.OnError(queue, singleMessageProcessException, message, true); | |
await DeleteMessage(settings, queue, message); | |
} | |
else | |
await settings.OnError(queue, singleMessageProcessException, message, false); | |
} | |
}); | |
} | |
} | |
}); | |
var tcs = new TaskCompletionSource<object>(); | |
tcs.TrySetCanceled(); | |
return tcs.Task; | |
} | |
private async static Task DeleteMessages(CloudQueuePollSettings settings, CloudQueue queue, IEnumerable<CloudQueueMessage> messages) | |
{ | |
foreach (var message in messages) | |
{ | |
int retryCount = settings.RetryCount; | |
do | |
{ | |
Exception exception = null; | |
try | |
{ | |
await TimeoutAfter(Task.Factory.FromAsync(queue.BeginDeleteMessage, queue.EndDeleteMessage, message, null), settings.RequestTimeout); | |
break; | |
} | |
catch (StorageClientException ex) | |
{ | |
// http://blog.smarx.com/posts/deleting-windows-azure-queue-messages-handling-exceptions | |
if (ex.ExtendedErrorInformation.ErrorCode == "MessageNotFound") | |
{ | |
// pop receipt must be invalid | |
// ignore or log (so we can tune the visibility timeout) | |
} | |
else | |
{ | |
// not the error we were expecting | |
exception = ex; | |
} | |
} | |
catch (Exception ex) | |
{ | |
exception = ex; | |
} | |
--retryCount; | |
if (exception != null) | |
await settings.OnError(queue, exception, message, retryCount == 0); | |
} while (retryCount >= 0); | |
} | |
} | |
private async static Task DeleteMessage(CloudQueuePollSettings settings, CloudQueue queue, CloudQueueMessage message) | |
{ | |
int retryCount = settings.RetryCount; | |
do | |
{ | |
Exception exception = null; | |
try | |
{ | |
await TimeoutAfter(Task.Factory.FromAsync(queue.BeginDeleteMessage, queue.EndDeleteMessage, message, null), settings.RequestTimeout); | |
} | |
catch (StorageClientException ex) | |
{ | |
// http://blog.smarx.com/posts/deleting-windows-azure-queue-messages-handling-exceptions | |
if (ex.ExtendedErrorInformation.ErrorCode == "MessageNotFound") | |
{ | |
// pop receipt must be invalid | |
// ignore or log (so we can tune the visibility timeout) | |
} | |
else | |
{ | |
// not the error we were expecting | |
exception = ex; | |
} | |
} | |
catch (Exception ex) | |
{ | |
exception = ex; | |
} | |
--retryCount; | |
if (exception != null) | |
await settings.OnError(queue, exception, message, retryCount == 0); | |
} while (retryCount >= 0); | |
} | |
private static async Task<T> TimeoutAfter<T>(Task<T> task, TimeSpan dueDate) | |
{ | |
// http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx | |
// infinite timeout or task already completed | |
if (task.IsCompleted || (dueDate.TotalMilliseconds == Timeout.Infinite)) | |
{ | |
// Either the task has already completed or timeout will never occur. | |
// No proxy necessary. | |
return await task; | |
} | |
// zero timeout | |
if (dueDate.TotalMilliseconds <= 0) | |
{ | |
// We've already timed out. | |
throw new TimeoutException(); | |
} | |
#if ASYNC_TARGETTING_PACK | |
if (task == await TaskEx.WhenAny(task, TaskEx.Delay(dueDate))) | |
#else | |
if (task == await Task.WhenAny(task, Task.Delay(dueDate))) | |
#endif | |
return await task; | |
else | |
throw new TimeoutException(); | |
} | |
private static async Task TimeoutAfter(Task task, TimeSpan dueDate) | |
{ | |
// http://blogs.msdn.com/b/pfxteam/archive/2011/11/10/10235834.aspx) | |
// infinite timeout or task already completed | |
if (task.IsCompleted || (dueDate.TotalMilliseconds == Timeout.Infinite)) | |
{ | |
// Either the task has already completed or timeout will never occur. | |
// No proxy necessary. | |
await task; | |
} | |
// zero timeout | |
if (dueDate.TotalMilliseconds <= 0) | |
{ | |
// We've already timed out. | |
throw new TimeoutException(); | |
} | |
#if ASYNC_TARGETTING_PACK | |
if (task == await TaskEx.WhenAny(task, TaskEx.Delay(dueDate))) | |
#else | |
if (task == await Task.WhenAny(task, Task.Delay(dueDate))) | |
#endif | |
await task; | |
else | |
throw new TimeoutException(); | |
} | |
} | |
} |
You can add this file in a .net 4.5 project and compile it using VS2012 then reference the output dll along with the async targeting pack in .net 4.0 http://blogs.msdn.com/b/csharpfaq/archive/2012/04/26/async-targeting-pack-for-visual-studio-11-now-available-for-net-4-and-silverlight-5.aspx
The code is currently incomplete, check in 2-3 days most likely I will have a complete version with exception handling by then.
One small improvement...
When creating the Task for reading messages, explicit that the task should be created attached to the parent task, so when the parent task ends, the task reading messages will also end at line 164:
Task.Factory.FromAsync<...>(queue.BeginGetMessages, queue.EndGetMessages,..., null, TaskCreationOptions.AttachedToParent);
@federicoboerr Task.Factory.FromAsync
now includes TaskCreationOptions.AttachedToParent
.
@federicoboerr removed TaskCreationOptions.AttachedToParent
when calling FromAsync<...>queue.BeginGetMessages
so it correctly propagates the exception when we use it with TimeoutAfter
rather then crashing the app.
Based on the "async" and "await" keywords, I assume this only works on .NET Framework 4.5?