Created
September 6, 2012 06:18
-
-
Save clemensv/3652025 to your computer and use it in GitHub Desktop.
Send with retry
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace MartVue.Common | |
{ | |
using System; | |
using System.Runtime.Serialization; | |
using System.Threading.Tasks; | |
using Microsoft.ServiceBus.Messaging; | |
public static class RobustSendExtension | |
{ | |
static readonly TaskFactory TaskFactory = new TaskFactory(); | |
public static async Task SendWithRetryAsync(this MessageSender sender, | |
object serializableObject, | |
XmlObjectSerializer serializer = null, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, serializableObject, serializer, | |
setProperties, | |
maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this MessageSender sender, | |
BrokeredMessage msg, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, msg, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this MessageSender sender, | |
Func<BrokeredMessage> getMessage, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, getMessage, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this QueueClient sender, | |
object serializableObject, | |
XmlObjectSerializer serializer = null, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, serializableObject, serializer, | |
setProperties, | |
maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this QueueClient sender, | |
BrokeredMessage msg, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, msg, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this QueueClient sender, | |
Func<BrokeredMessage> getMessage, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, getMessage, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this TopicClient sender, | |
object serializableObject, | |
XmlObjectSerializer serializer = null, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await | |
SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, serializableObject, serializer, | |
setProperties, | |
maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this TopicClient sender, | |
BrokeredMessage msg, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, msg, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsync(this TopicClient sender, | |
Func<BrokeredMessage> getMessage, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(sender.BeginSend, sender.EndSend, getMessage, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsyncInternal( | |
Func<BrokeredMessage, AsyncCallback, object, IAsyncResult> beginSend, | |
Action<IAsyncResult> endSend, | |
object serializableObject, | |
XmlObjectSerializer serializer = null, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(beginSend, endSend, () => serializer != null | |
? new BrokeredMessage(serializableObject, | |
serializer) | |
: new BrokeredMessage(serializableObject), | |
setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsyncInternal( | |
Func<BrokeredMessage, AsyncCallback, object, IAsyncResult> beginSend, | |
Action<IAsyncResult> endSend, | |
BrokeredMessage msg, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
await SendWithRetryAsyncInternal(beginSend, endSend, () => msg, setProperties, maxRetries); | |
} | |
public static async Task SendWithRetryAsyncInternal( | |
Func<BrokeredMessage, AsyncCallback, object, IAsyncResult> beginSend, | |
Action<IAsyncResult> endSend, | |
Func<BrokeredMessage> getMessage, | |
Action<BrokeredMessage> setProperties = null, | |
int maxRetries = 5) | |
{ | |
int retries = 0; | |
const int retryBackoff = 100; | |
bool sent = false; | |
while (!sent) | |
{ | |
try | |
{ | |
var brokeredMessage = getMessage(); | |
if (setProperties != null) | |
{ | |
setProperties(brokeredMessage); | |
} | |
await TaskFactory.FromAsync(beginSend, endSend, brokeredMessage, null, TaskCreationOptions.None); | |
sent = true; | |
} | |
catch (MessagingException exception) | |
{ | |
if (exception.IsTransient) | |
{ | |
if (++retries > maxRetries) | |
{ | |
throw; | |
} | |
goto Retry; | |
} | |
throw; | |
} | |
continue; | |
Retry: | |
await Task.Delay((int) Math.Pow(3, retries)*retryBackoff); | |
continue; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment