Created
April 3, 2023 14:52
-
-
Save tomkuijsten/aa3374c09ec0b6db4a542aa8ef343716 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace ServiceBus.Extensions.Bindings.MessageActions; | |
public class MessageActionsExtended | |
{ | |
private const string RETRY_COUNT = "X-RetryCount"; | |
private readonly ServiceBusMessageActions _messageActions; | |
private readonly ServiceBusSender _sender; | |
// Summary: | |
// For testing purposes only | |
public MessageActionsExtended() { } | |
public MessageActionsExtended( | |
string queueOrTopicName, | |
ServiceBusClient client, | |
ServiceBusMessageActions messageActions) | |
{ | |
_messageActions = messageActions; | |
_sender = client.CreateSender(queueOrTopicName); | |
} | |
public virtual Task DeadLetterAsync(ServiceBusReceivedMessage message, string error, string details) | |
{ | |
return _messageActions.DeadLetterMessageAsync(message, error, details); | |
} | |
public virtual Task AbandonAsync(ServiceBusReceivedMessage message) | |
{ | |
return _messageActions.AbandonMessageAsync(message); | |
} | |
public virtual Task BackoffOrDeadletterAsync( | |
ServiceBusReceivedMessage message, | |
int times, | |
TimeSpan delay, | |
string error = null, | |
string errorDetails = null) | |
{ | |
var ms = Enumerable.Repeat(delay, times).Select(t => t.TotalMilliseconds).ToArray(); | |
return BackoffOrDeadletterAsync(message, ms, error, errorDetails); | |
} | |
public virtual async Task BackoffOrDeadletterAsync( | |
ServiceBusReceivedMessage message, | |
double[] backoffIntervalsInMs, | |
string error = null, | |
string errorDetails = null) | |
{ | |
int times = backoffIntervalsInMs.Length; | |
int retryCount = 1; | |
if (message.ApplicationProperties.TryGetValue(RETRY_COUNT, out var deliveryCountFromMessage)) | |
{ | |
retryCount = (deliveryCountFromMessage as int?) ?? 0; | |
retryCount++; | |
} | |
if (retryCount > times) | |
{ | |
await DeadLetterAsync(message, error, errorDetails); | |
return; | |
} | |
var newMessage = new ServiceBusMessage(message) | |
{ | |
MessageId = Guid.NewGuid().ToString("N").ToLowerInvariant(), | |
ScheduledEnqueueTime = DateTime.UtcNow.AddMilliseconds(backoffIntervalsInMs[retryCount - 1]) | |
}; | |
newMessage.ApplicationProperties[RETRY_COUNT] = retryCount; | |
await _sender.SendMessageAsync(newMessage); | |
// Only when the new message was send, we can complete the original message. | |
await CompleteAsync(message); | |
} | |
public virtual Task BackoffOrCompleteAsync(ServiceBusReceivedMessage message, int times) | |
{ | |
return BackoffOrCompleteAsync(message, Enumerable.Repeat(0.0, times).ToArray()); | |
} | |
public virtual async Task BackoffOrCompleteAsync(ServiceBusReceivedMessage message, double[] backoffIntervalsInMs) | |
{ | |
int times = backoffIntervalsInMs.Length; | |
int retryCount = 1; | |
if (message.ApplicationProperties.TryGetValue(RETRY_COUNT, out var deliveryCountFromMessage)) | |
{ | |
retryCount = (deliveryCountFromMessage as int?) ?? 0; | |
retryCount++; | |
} | |
if (retryCount > times) | |
{ | |
await CompleteAsync(message); | |
return; | |
} | |
var newMessage = new ServiceBusMessage(message) | |
{ | |
MessageId = Guid.NewGuid().ToString("N").ToLowerInvariant(), | |
ScheduledEnqueueTime = DateTime.UtcNow.AddMilliseconds(backoffIntervalsInMs[retryCount - 1]) | |
}; | |
newMessage.ApplicationProperties[RETRY_COUNT] = retryCount; | |
await _sender.SendMessageAsync(newMessage); | |
// Only when the new message was send, we can complete the original message. | |
await CompleteAsync(message); | |
} | |
public virtual Task CompleteAsync(ServiceBusReceivedMessage message) | |
{ | |
return _messageActions.CompleteMessageAsync(message); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment