Skip to content

Instantly share code, notes, and snippets.

Last active October 28, 2019 09:42
Show Gist options
  • Save justinyoo/f8d0e00af42daf449664813600163419 to your computer and use it in GitHub Desktop.
Save justinyoo/f8d0e00af42daf449664813600163419 to your computer and use it in GitHub Desktop.
Handling Messages with Geo-Redundant Azure Service Bus via Azure Functions
public class AzureServiceBusSettings
public virtual Dictionary<string, string> ConnectionStrings { get; set; }
public virtual AzureServiceBusTopicSettings Topic { get; set; }
public class AzureServiceBusTopicSettings
public virtual string Name { get; set; }
public virtual AzureServiceBusTopicSubscriptionSettings Subscription { get; set; }
public class AzureServiceBusTopicSubscriptionSettings
public virtual string Name { get; set; }
public class AppSettings : AppSettingsBase
private const string ServiceBusSettingsKey = "AzureServiceBus";
public AppSettings()
// Get the strongly-typed app settings instance.
this.ServiceBus = this.Config.Get<AzureServiceBusSettings>(ServiceBusSettingsKey);
public virtual AzureServiceBusSettings ServiceBus { get; }
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/receive")] HttpRequest req,
ILogger log)
log.LogInformation("C# HTTP trigger function processed a request.");
await this._service
.ReceiveAsync(async (client, message) =>
log.LogInformation($"Processed: {message.MessageId} at {client.ServiceBusConnection.Endpoint}");
await Task.CompletedTask.ConfigureAwait(false);
return new OkResult();
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/send")] HttpRequest req,
ILogger log)
log.LogInformation("C# HTTP trigger function processed a request.");
var payload = new SamplePayload() { Message = "Hello World" };
var result = await this._service
return new OkObjectResult(result);
public class MessageSendHttpTrigger
private readonly IMessageService _service;
public MessageSendHttpTrigger(IMessageService service)
this._service = service
?? throw new ArgumentNullException(nameof(service));
[assembly: FunctionsStartup(typeof(GeoRedundant.FunctionApp.StartUp))]
namespace GeoRedundant.FunctionApp
public class StartUp : FunctionsStartup
public override void Configure(IFunctionsHostBuilder builder)
private void ConfigureAppSettings(IServiceCollection services)
private void ConfigureServices(IServiceCollection services)
services.AddTransient<IMessageService, MessageService>();
"Values": {
"AzureServiceBus__ConnectionStrings__Primary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]",
"AzureServiceBus__ConnectionStrings__Secondary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]",
"AzureServiceBus__Topic__Name": "my-topic",
"AzureServiceBus__Topic__Subscription__Name": "my-topic-subscription"
public async Task ReceiveAsync(Func<ISubscriptionClient, Message, Task> callbackToProcess)
var messageIds = new List<string>();
var msglock = new object();
// Local function: Handles messages.
async Task onMessageReceived(ISubscriptionClient client, Message message, int maxMessageDeduplicationCount = 20)
var duplicated = false;
lock (msglock)
duplicated = messageIds.Remove(message.MessageId);
if (!duplicated)
if (messageIds.Count > maxMessageDeduplicationCount)
if (!duplicated)
await callbackToProcess(client, message).ConfigureAwait(false);
var exceptions = new ConcurrentQueue<Exception>();
// Local function: Handles exceptions.
async Task onExceptionReceived(ExceptionReceivedEventArgs args)
await Task.CompletedTask.ConfigureAwait(false);
if (!this._subscriptionClients.Any())
throw new InvalidOperationException("No SubscriptionClient exist");
foreach (var client in this._subscriptionClients)
(msg, token) => onMessageReceived(client, 1, msg),
new MessageHandlerOptions(onExceptionReceived) { AutoComplete = true, MaxConcurrentCalls = 1 });
if (exceptions.Count == this._subscriptionClients.Count)
throw new AggregateException(exceptions);
await Task.CompletedTask.ConfigureAwait(false);
public async Task<string> SendAsync(string value)
var body = Encoding.UTF8.GetBytes(value);
var message = new Message(body) { MessageId = Guid.NewGuid().ToString() };
var exceptions = new ConcurrentQueue<Exception>();
if (!this._topicClients.Any())
throw new InvalidOperationException("No TopicClient exist");
// Fan-out messages.
foreach (var client in this._topicClients)
await client.SendAsync(message.Clone());
catch (Exception ex)
// Throw the exception if all clients fail sending the message.
if (exceptions.Count == this._topicClients.Count)
throw new AggregateException(exceptions);
return message.MessageId;
public IMessageService WithSubscriptionClients()
foreach (var kvp in this._settings.ConnectionStrings)
var client = new SubscriptionClient(
return this;
public IMessageService WithTopicClients()
foreach (var kvp in this._settings.ConnectionStrings)
var client = new TopicClient(kvp.Value, this._settings.Topic.Name);
return this;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment