Skip to content

Instantly share code, notes, and snippets.

@nianton
Last active April 23, 2020 06:55
Service Bus long running message handling
using Microsoft.Azure.ServiceBus;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ServiceBusHandler
{
class Program
{
const string ServiceBusConnectionString = "<SERVICE_BUS_CONNECTIONSTRING>";
const int MessageProcessDurationInMins = 18;
const string TopicPath = "<TOPIC>";
const string SubscriptionName= "<SUBSCRIPTION_NAME>";
static SubscriptionClient subscriptionClient;
static void Main(string[] args)
{
Console.WriteLine($"Service Bus Handler - will pretend long process of {MessageProcessDurationInMins} mins per message.");
Console.WriteLine($"***");
Console.WriteLine($"Awaiting messages for topic: {TopicPath} and subscription: {SubscriptionName}");
Console.WriteLine();
var connection = new ServiceBusConnection(ServiceBusConnectionString);
var messageHandlerOptions = new MessageHandlerOptions(HandleException)
{
MaxAutoRenewDuration = TimeSpan.FromMinutes(30),
AutoComplete = false
};
subscriptionClient = new SubscriptionClient(connection, TopicPath, SubscriptionName, ReceiveMode.PeekLock, RetryPolicy.Default);
subscriptionClient.RegisterMessageHandler(HandleLongRunningMessage, messageHandlerOptions);
// Blocking exit to remain active...
Console.ReadLine();
Console.WriteLine("Exiting...");
}
static async Task HandleLongRunningMessage(Message message, CancellationToken cancellationToken)
{
Console.WriteLine($"Handling message: [{message.MessageId}]...");
var body = Encoding.UTF8.GetString(message.Body);
await PretendLongProcessing(MessageProcessDurationInMins);
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
Console.WriteLine($"Completed message: [{message.MessageId}]");
}
static async Task PretendLongProcessing(int minutes = 1)
{
var minsPassed = 0;
while (minsPassed < minutes)
{
minsPassed++;
Console.WriteLine($"Waiting {minsPassed} min...");
await Task.Delay(60_000);
}
}
static async Task HandleException(ExceptionReceivedEventArgs args)
{
var ctx = args.ExceptionReceivedContext;
Console.WriteLine("*** Error Occurred ***");
Console.WriteLine($"Exception Context: action:{ctx.Action}, clientId:{ctx.ClientId}, endPoint:{ctx.Endpoint}, entityPath:{ctx.EntityPath}");
Console.Error.WriteLine(args.Exception);
await Task.Yield();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment