Last active
June 15, 2022 14:14
-
-
Save ramonsmits/fd11cfe0ad516fbf908288b412ab75cc to your computer and use it in GitHub Desktop.
NServiceBus 7 - Validate if the Azure Service Bus unicast destination exists
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Usage: | |
// | |
// var endpointConfiguration = new EndpointConfiguration("MyEndpoint") | |
// ... | |
// var connectionString = Environment.GetEnvironmentVariable("AzureServiceBus_ConnectionString"); | |
// var managementClient = new ManagementClient(connectionString); | |
// endpointConfiguration.Pipeline.Register(new ValidateDestinationExistsBehavior (managementClient), nameof(ValidateDestinationExistsBehavior)); | |
// | |
using Microsoft.Azure.ServiceBus.Management; | |
using NServiceBus.Pipeline; | |
using NServiceBus.Routing; | |
class ValidateDestinationExistsBehavior : IBehavior<IOutgoingPhysicalMessageContext, IOutgoingPhysicalMessageContext> | |
{ | |
const int CacheLimit = 2500; | |
readonly HashSet<string> KnownDestinations = new HashSet<string>(); | |
readonly ManagementClient ManagementClient; | |
public ValidateDestinationExistsBehavior(ManagementClient managementClient) | |
{ | |
ManagementClient = managementClient; | |
} | |
public async Task Invoke(IOutgoingPhysicalMessageContext context, Func<IOutgoingPhysicalMessageContext, Task> next) | |
{ | |
var routingStrategy = context.RoutingStrategies.Single(); | |
var targetEndpoint = routingStrategy.Apply(new Dictionary<string, string>()); | |
var unicastAddress = targetEndpoint as UnicastAddressTag; | |
if (unicastAddress is not null) | |
{ | |
var destination = unicastAddress.Destination; | |
if (!KnownDestinations.Contains(destination)) | |
{ | |
var exists = await ManagementClient.QueueExistsAsync(destination) | |
.ConfigureAwait(false); | |
if (!exists) | |
{ | |
throw new InvalidOperationException("Destination does not exist.") | |
{ | |
Data = { { "Destination", destination } } | |
}; | |
} | |
// If we allow unbounded collection this could cause a memory leak. Ensuring we have check for a maximum number of items. | |
if (KnownDestinations.Count > CacheLimit) | |
{ | |
throw new InvalidOperationException($"Known destination cache exceeds {CacheLimit} destinations."); | |
} | |
KnownDestinations.Add(destination); | |
} | |
} | |
await next(context) | |
.ConfigureAwait(false); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment