Created
March 18, 2020 17:17
-
-
Save fabiocannas/6e6bf298922642f79178e8acbd5eb1ce to your computer and use it in GitHub Desktop.
Here are two implementations of receiving batch messages from DeadLetterQueue of an Azure service bus entity. The second one is done by using the SubscriptionRuntimeInfo and ManagementClient classes introduced in Microsoft.Azure.ServiceBus version 4.1.1.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Text; | |
using System.Threading.Tasks; | |
using AzureServiceBusExamples.Settings; | |
using Microsoft.Azure.ServiceBus; | |
using AzureServiceBusExamples.Loggers; | |
using System.Collections.Generic; | |
using Microsoft.Azure.ServiceBus.Core; | |
using System.Diagnostics; | |
using System.Linq; | |
using Microsoft.Azure.ServiceBus.Management; | |
namespace AzureServiceBusExamples.SendAndReceive | |
{ | |
public class AzureServiceBusManager : IAzureServiceBusManager | |
{ | |
private ITopicClient _topicClient; | |
private string ServiceBusConnectionString => Settings.AzureServiceBus.ConnectionString; | |
private string TopicName => Settings.AzureServiceBus.TopicName; | |
private string DeadLetterQueuePath => Settings.AzureServiceBus.DeadLetterQueuePath; | |
public AzureServiceBusManager() | |
{ | |
_topicClient = new TopicClient( | |
ServiceBusConnectionString, | |
TopicName); | |
} | |
public async Task SendAsync(IList<Message> outgoingMessages) | |
{ | |
try | |
{ | |
await _topicClient.SendAsync(outgoingMessages).ConfigureAwait(false); | |
} | |
catch (Exception exception) | |
{ | |
throw exception; | |
} | |
finally | |
{ | |
await _topicClient.CloseAsync().ConfigureAwait(false); | |
} | |
} | |
public async Task<IList<Message>> ReceiveAsyncV1() | |
{ | |
List<Message> receivedMessages = new List<Message>(); | |
MessageReceiver messageReceiver = new MessageReceiver(ServiceBusConnectionString, DeadLetterQueuePath); | |
List<string> lockTockens = new List<string>(); | |
while (true) | |
{ | |
try | |
{ | |
IList<Message> receiveAsyncResult = await messageReceiver.ReceiveAsync(1000, TimeSpan.FromSeconds(30)); | |
if (receiveAsyncResult?.Any() == true) | |
{ | |
receivedMessages.AddRange(receiveAsyncResult.ToList()); | |
lockTockens.AddRange(receiveAsyncResult.Select(m => m.SystemProperties.LockToken)); | |
await messageReceiver.CompleteAsync(lockTockens); | |
lockTockens.Clear(); | |
} | |
else | |
{ | |
break; | |
} | |
} | |
catch (Exception exception) | |
{ | |
Logger.Log.Error(exception.Message, exception); | |
if (lockTockens.Count > 0) | |
{ | |
foreach (var lockTocken in lockTockens) | |
{ | |
await messageReceiver.AbandonAsync(lockTocken); | |
} | |
lockTockens.Clear(); | |
} | |
break; | |
} | |
} | |
await messageReceiver.CloseAsync(); | |
return receivedMessages; | |
} | |
public async Task<IList<Message>> ReceiveAsyncV2() | |
{ | |
List<Message> receivedMessages = new List<Message>(); | |
SubscriptionRuntimeInfo subscriptionRunTimeInfo = null; | |
ManagementClient managementClient = new ManagementClient(ServiceBusConnectionString); | |
subscriptionRunTimeInfo = await managementClient.GetSubscriptionRuntimeInfoAsync(TopicName, SubscriptionName); | |
await managementClient.CloseAsync(); | |
long longDeadLetterMessageCount = subscriptionRunTimeInfo.MessageCountDetails.DeadLetterMessageCount; | |
Logger.Log.Info($"DeadLetterQueue contains {longDeadLetterMessageCount} messages."); | |
if (longDeadLetterMessageCount == 0) | |
{ | |
Logger.Log.Info($"Stopping reception since DeadLetterQueue contains zero messages."); | |
return receivedMessages; | |
} | |
int deadLetterMessageCount = 0; | |
try | |
{ | |
deadLetterMessageCount = checked((int)longDeadLetterMessageCount); | |
} | |
catch (OverflowException exception) | |
{ | |
Logger.Log.Error($"DeadLetterMessageCount was greater than integer max value: {longDeadLetterMessageCount}"); | |
throw; | |
} | |
MessageReceiver messageReceiver = new MessageReceiver(ServiceBusConnectionString, DeadLetterQueuePath); | |
List<string> lockTockens = new List<string>(); | |
while (receivedMessages.Count < deadLetterMessageCount) | |
{ | |
try | |
{ | |
IList<Message> receiveAsyncResult = await messageReceiver.ReceiveAsync(1000, TimeSpan.FromSeconds(30)); | |
if (receiveAsyncResult?.Any() == true) | |
{ | |
receivedMessages.AddRange(receiveAsyncResult.ToList()); | |
lockTockens.AddRange(receiveAsyncResult.Select(m => m.SystemProperties.LockToken)); | |
await messageReceiver.CompleteAsync(lockTockens); | |
lockTockens.Clear(); | |
} | |
} | |
catch (Exception exception) | |
{ | |
Logger.Log.Error(exception.Message, exception); | |
if (lockTockens.Count > 0) | |
{ | |
foreach (var lockTocken in lockTockens) | |
{ | |
await messageReceiver.AbandonAsync(lockTocken); | |
} | |
lockTockens.Clear(); | |
} | |
break; | |
} | |
} | |
await messageReceiver.CloseAsync(); | |
return receivedMessages; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment