Skip to content

Instantly share code, notes, and snippets.

@clemensv
Created September 6, 2012 06:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clemensv/3652025 to your computer and use it in GitHub Desktop.
Save clemensv/3652025 to your computer and use it in GitHub Desktop.
Send with retry
namespace MartVue.Common
{
using System;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Microsoft.ServiceBus.Messaging;
public static class RobustSendExtension
{
static readonly TaskFactory TaskFactory = new TaskFactory();
public static async Task SendWithRetryAsync(this MessageSender sender,
object serializableObject,
XmlObjectSerializer serializer = null,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, serializableObject, serializer,
setProperties,
maxRetries);
}
public static async Task SendWithRetryAsync(this MessageSender sender,
BrokeredMessage msg,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, msg, setProperties, maxRetries);
}
public static async Task SendWithRetryAsync(this MessageSender sender,
Func<BrokeredMessage> getMessage,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, getMessage, setProperties, maxRetries);
}
public static async Task SendWithRetryAsync(this QueueClient sender,
object serializableObject,
XmlObjectSerializer serializer = null,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, serializableObject, serializer,
setProperties,
maxRetries);
}
public static async Task SendWithRetryAsync(this QueueClient sender,
BrokeredMessage msg,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, msg, setProperties, maxRetries);
}
public static async Task SendWithRetryAsync(this QueueClient sender,
Func<BrokeredMessage> getMessage,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, getMessage, setProperties, maxRetries);
}
public static async Task SendWithRetryAsync(this TopicClient sender,
object serializableObject,
XmlObjectSerializer serializer = null,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await
SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, serializableObject, serializer,
setProperties,
maxRetries);
}
public static async Task SendWithRetryAsync(this TopicClient sender,
BrokeredMessage msg,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, msg, setProperties, maxRetries);
}
public static async Task SendWithRetryAsync(this TopicClient sender,
Func<BrokeredMessage> getMessage,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, getMessage, setProperties, maxRetries);
}
public static async Task SendWithRetryAsyncInternal(
Func<BrokeredMessage, AsyncCallback, object, IAsyncResult> beginSend,
Action<IAsyncResult> endSend,
object serializableObject,
XmlObjectSerializer serializer = null,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(beginSend, endSend, () => serializer != null
? new BrokeredMessage(serializableObject,
serializer)
: new BrokeredMessage(serializableObject),
setProperties, maxRetries);
}
public static async Task SendWithRetryAsyncInternal(
Func<BrokeredMessage, AsyncCallback, object, IAsyncResult> beginSend,
Action<IAsyncResult> endSend,
BrokeredMessage msg,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
await SendWithRetryAsyncInternal(beginSend, endSend, () => msg, setProperties, maxRetries);
}
public static async Task SendWithRetryAsyncInternal(
Func<BrokeredMessage, AsyncCallback, object, IAsyncResult> beginSend,
Action<IAsyncResult> endSend,
Func<BrokeredMessage> getMessage,
Action<BrokeredMessage> setProperties = null,
int maxRetries = 5)
{
int retries = 0;
const int retryBackoff = 100;
bool sent = false;
while (!sent)
{
try
{
var brokeredMessage = getMessage();
if (setProperties != null)
{
setProperties(brokeredMessage);
}
await TaskFactory.FromAsync(beginSend, endSend, brokeredMessage, null, TaskCreationOptions.None);
sent = true;
}
catch (MessagingException exception)
{
if (exception.IsTransient)
{
if (++retries > maxRetries)
{
throw;
}
goto Retry;
}
throw;
}
continue;
Retry:
await Task.Delay((int) Math.Pow(3, retries)*retryBackoff);
continue;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment