Skip to content

Instantly share code, notes, and snippets.

@fabiocannas
Created March 18, 2020 17:17
Show Gist options
  • Save fabiocannas/6e6bf298922642f79178e8acbd5eb1ce to your computer and use it in GitHub Desktop.
Save fabiocannas/6e6bf298922642f79178e8acbd5eb1ce to your computer and use it in GitHub Desktop.
Here are two implementations of receiving batch messages from DeadLetterQueue of an Azure service bus entity. The second one is done by using the SubscriptionRuntimeInfo and ManagementClient classes introduced in Microsoft.Azure.ServiceBus version 4.1.1.
using System;
using System.Text;
using System.Threading.Tasks;
using AzureServiceBusExamples.Settings;
using Microsoft.Azure.ServiceBus;
using AzureServiceBusExamples.Loggers;
using System.Collections.Generic;
using Microsoft.Azure.ServiceBus.Core;
using System.Diagnostics;
using System.Linq;
using Microsoft.Azure.ServiceBus.Management;
namespace AzureServiceBusExamples.SendAndReceive
{
public class AzureServiceBusManager : IAzureServiceBusManager
{
private ITopicClient _topicClient;
private string ServiceBusConnectionString => Settings.AzureServiceBus.ConnectionString;
private string TopicName => Settings.AzureServiceBus.TopicName;
private string DeadLetterQueuePath => Settings.AzureServiceBus.DeadLetterQueuePath;
public AzureServiceBusManager()
{
_topicClient = new TopicClient(
ServiceBusConnectionString,
TopicName);
}
public async Task SendAsync(IList<Message> outgoingMessages)
{
try
{
await _topicClient.SendAsync(outgoingMessages).ConfigureAwait(false);
}
catch (Exception exception)
{
throw exception;
}
finally
{
await _topicClient.CloseAsync().ConfigureAwait(false);
}
}
public async Task<IList<Message>> ReceiveAsyncV1()
{
List<Message> receivedMessages = new List<Message>();
MessageReceiver messageReceiver = new MessageReceiver(ServiceBusConnectionString, DeadLetterQueuePath);
List<string> lockTockens = new List<string>();
while (true)
{
try
{
IList<Message> receiveAsyncResult = await messageReceiver.ReceiveAsync(1000, TimeSpan.FromSeconds(30));
if (receiveAsyncResult?.Any() == true)
{
receivedMessages.AddRange(receiveAsyncResult.ToList());
lockTockens.AddRange(receiveAsyncResult.Select(m => m.SystemProperties.LockToken));
await messageReceiver.CompleteAsync(lockTockens);
lockTockens.Clear();
}
else
{
break;
}
}
catch (Exception exception)
{
Logger.Log.Error(exception.Message, exception);
if (lockTockens.Count > 0)
{
foreach (var lockTocken in lockTockens)
{
await messageReceiver.AbandonAsync(lockTocken);
}
lockTockens.Clear();
}
break;
}
}
await messageReceiver.CloseAsync();
return receivedMessages;
}
public async Task<IList<Message>> ReceiveAsyncV2()
{
List<Message> receivedMessages = new List<Message>();
SubscriptionRuntimeInfo subscriptionRunTimeInfo = null;
ManagementClient managementClient = new ManagementClient(ServiceBusConnectionString);
subscriptionRunTimeInfo = await managementClient.GetSubscriptionRuntimeInfoAsync(TopicName, SubscriptionName);
await managementClient.CloseAsync();
long longDeadLetterMessageCount = subscriptionRunTimeInfo.MessageCountDetails.DeadLetterMessageCount;
Logger.Log.Info($"DeadLetterQueue contains {longDeadLetterMessageCount} messages.");
if (longDeadLetterMessageCount == 0)
{
Logger.Log.Info($"Stopping reception since DeadLetterQueue contains zero messages.");
return receivedMessages;
}
int deadLetterMessageCount = 0;
try
{
deadLetterMessageCount = checked((int)longDeadLetterMessageCount);
}
catch (OverflowException exception)
{
Logger.Log.Error($"DeadLetterMessageCount was greater than integer max value: {longDeadLetterMessageCount}");
throw;
}
MessageReceiver messageReceiver = new MessageReceiver(ServiceBusConnectionString, DeadLetterQueuePath);
List<string> lockTockens = new List<string>();
while (receivedMessages.Count < deadLetterMessageCount)
{
try
{
IList<Message> receiveAsyncResult = await messageReceiver.ReceiveAsync(1000, TimeSpan.FromSeconds(30));
if (receiveAsyncResult?.Any() == true)
{
receivedMessages.AddRange(receiveAsyncResult.ToList());
lockTockens.AddRange(receiveAsyncResult.Select(m => m.SystemProperties.LockToken));
await messageReceiver.CompleteAsync(lockTockens);
lockTockens.Clear();
}
}
catch (Exception exception)
{
Logger.Log.Error(exception.Message, exception);
if (lockTockens.Count > 0)
{
foreach (var lockTocken in lockTockens)
{
await messageReceiver.AbandonAsync(lockTocken);
}
lockTockens.Clear();
}
break;
}
}
await messageReceiver.CloseAsync();
return receivedMessages;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment