Skip to content

Instantly share code, notes, and snippets.

@tomkuijsten
Created April 3, 2023 14:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomkuijsten/aa3374c09ec0b6db4a542aa8ef343716 to your computer and use it in GitHub Desktop.
Save tomkuijsten/aa3374c09ec0b6db4a542aa8ef343716 to your computer and use it in GitHub Desktop.
namespace ServiceBus.Extensions.Bindings.MessageActions;
public class MessageActionsExtended
{
private const string RETRY_COUNT = "X-RetryCount";
private readonly ServiceBusMessageActions _messageActions;
private readonly ServiceBusSender _sender;
// Summary:
// For testing purposes only
public MessageActionsExtended() { }
public MessageActionsExtended(
string queueOrTopicName,
ServiceBusClient client,
ServiceBusMessageActions messageActions)
{
_messageActions = messageActions;
_sender = client.CreateSender(queueOrTopicName);
}
public virtual Task DeadLetterAsync(ServiceBusReceivedMessage message, string error, string details)
{
return _messageActions.DeadLetterMessageAsync(message, error, details);
}
public virtual Task AbandonAsync(ServiceBusReceivedMessage message)
{
return _messageActions.AbandonMessageAsync(message);
}
public virtual Task BackoffOrDeadletterAsync(
ServiceBusReceivedMessage message,
int times,
TimeSpan delay,
string error = null,
string errorDetails = null)
{
var ms = Enumerable.Repeat(delay, times).Select(t => t.TotalMilliseconds).ToArray();
return BackoffOrDeadletterAsync(message, ms, error, errorDetails);
}
public virtual async Task BackoffOrDeadletterAsync(
ServiceBusReceivedMessage message,
double[] backoffIntervalsInMs,
string error = null,
string errorDetails = null)
{
int times = backoffIntervalsInMs.Length;
int retryCount = 1;
if (message.ApplicationProperties.TryGetValue(RETRY_COUNT, out var deliveryCountFromMessage))
{
retryCount = (deliveryCountFromMessage as int?) ?? 0;
retryCount++;
}
if (retryCount > times)
{
await DeadLetterAsync(message, error, errorDetails);
return;
}
var newMessage = new ServiceBusMessage(message)
{
MessageId = Guid.NewGuid().ToString("N").ToLowerInvariant(),
ScheduledEnqueueTime = DateTime.UtcNow.AddMilliseconds(backoffIntervalsInMs[retryCount - 1])
};
newMessage.ApplicationProperties[RETRY_COUNT] = retryCount;
await _sender.SendMessageAsync(newMessage);
// Only when the new message was send, we can complete the original message.
await CompleteAsync(message);
}
public virtual Task BackoffOrCompleteAsync(ServiceBusReceivedMessage message, int times)
{
return BackoffOrCompleteAsync(message, Enumerable.Repeat(0.0, times).ToArray());
}
public virtual async Task BackoffOrCompleteAsync(ServiceBusReceivedMessage message, double[] backoffIntervalsInMs)
{
int times = backoffIntervalsInMs.Length;
int retryCount = 1;
if (message.ApplicationProperties.TryGetValue(RETRY_COUNT, out var deliveryCountFromMessage))
{
retryCount = (deliveryCountFromMessage as int?) ?? 0;
retryCount++;
}
if (retryCount > times)
{
await CompleteAsync(message);
return;
}
var newMessage = new ServiceBusMessage(message)
{
MessageId = Guid.NewGuid().ToString("N").ToLowerInvariant(),
ScheduledEnqueueTime = DateTime.UtcNow.AddMilliseconds(backoffIntervalsInMs[retryCount - 1])
};
newMessage.ApplicationProperties[RETRY_COUNT] = retryCount;
await _sender.SendMessageAsync(newMessage);
// Only when the new message was send, we can complete the original message.
await CompleteAsync(message);
}
public virtual Task CompleteAsync(ServiceBusReceivedMessage message)
{
return _messageActions.CompleteMessageAsync(message);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment