Skip to content

Instantly share code, notes, and snippets.

@justinyoo
Last active October 28, 2019 09:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinyoo/f8d0e00af42daf449664813600163419 to your computer and use it in GitHub Desktop.
Save justinyoo/f8d0e00af42daf449664813600163419 to your computer and use it in GitHub Desktop.
Handling Messages with Geo-Redundant Azure Service Bus via Azure Functions
public class AzureServiceBusSettings
{
public virtual Dictionary<string, string> ConnectionStrings { get; set; }
public virtual AzureServiceBusTopicSettings Topic { get; set; }
}
public class AzureServiceBusTopicSettings
{
public virtual string Name { get; set; }
public virtual AzureServiceBusTopicSubscriptionSettings Subscription { get; set; }
}
public class AzureServiceBusTopicSubscriptionSettings
{
public virtual string Name { get; set; }
}
public class AppSettings : AppSettingsBase
{
private const string ServiceBusSettingsKey = "AzureServiceBus";
public AppSettings()
{
// Get the strongly-typed app settings instance.
this.ServiceBus = this.Config.Get<AzureServiceBusSettings>(ServiceBusSettingsKey);
}
public virtual AzureServiceBusSettings ServiceBus { get; }
}
[FunctionName(nameof(MessageReceiveHttpTrigger))]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/receive")] HttpRequest req,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
await this._service
.WithSubscriptionClients()
.ReceiveAsync(async (client, message) =>
{
log.LogInformation($"Processed: {message.MessageId} at {client.ServiceBusConnection.Endpoint}");
await Task.CompletedTask.ConfigureAwait(false);
})
.ConfigureAwait(false);
return new OkResult();
}
[FunctionName(nameof(MessageSendHttpTrigger))]
public async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/send")] HttpRequest req,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
var payload = new SamplePayload() { Message = "Hello World" };
var result = await this._service
.WithTopicClients()
.SendAsync(payload)
.ConfigureAwait(false);
return new OkObjectResult(result);
}
public class MessageSendHttpTrigger
{
private readonly IMessageService _service;
public MessageSendHttpTrigger(IMessageService service)
{
this._service = service
?? throw new ArgumentNullException(nameof(service));
}
...
}
[assembly: FunctionsStartup(typeof(GeoRedundant.FunctionApp.StartUp))]
namespace GeoRedundant.FunctionApp
{
public class StartUp : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
this.ConfigureAppSettings(builder.Services);
this.ConfigureServices(builder.Services);
}
private void ConfigureAppSettings(IServiceCollection services)
{
services.AddSingleton<AppSettings>();
}
private void ConfigureServices(IServiceCollection services)
{
services.AddTransient<IMessageService, MessageService>();
}
}
}
{
"Values": {
...
"AzureServiceBus__ConnectionStrings__Primary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]",
"AzureServiceBus__ConnectionStrings__Secondary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]",
"AzureServiceBus__Topic__Name": "my-topic",
"AzureServiceBus__Topic__Subscription__Name": "my-topic-subscription"
}
}
public async Task ReceiveAsync(Func<ISubscriptionClient, Message, Task> callbackToProcess)
{
var messageIds = new List<string>();
var msglock = new object();
// Local function: Handles messages.
async Task onMessageReceived(ISubscriptionClient client, Message message, int maxMessageDeduplicationCount = 20)
{
var duplicated = false;
lock (msglock)
{
duplicated = messageIds.Remove(message.MessageId);
if (!duplicated)
{
messageIds.Add(message.MessageId);
if (messageIds.Count > maxMessageDeduplicationCount)
{
messageIds.RemoveAt(0);
}
}
}
if (!duplicated)
{
await callbackToProcess(client, message).ConfigureAwait(false);
}
}
var exceptions = new ConcurrentQueue<Exception>();
// Local function: Handles exceptions.
async Task onExceptionReceived(ExceptionReceivedEventArgs args)
{
exceptions.Enqueue(args.Exception);
await Task.CompletedTask.ConfigureAwait(false);
}
if (!this._subscriptionClients.Any())
{
throw new InvalidOperationException("No SubscriptionClient exist");
}
foreach (var client in this._subscriptionClients)
{
client.RegisterMessageHandler(
(msg, token) => onMessageReceived(client, 1, msg),
new MessageHandlerOptions(onExceptionReceived) { AutoComplete = true, MaxConcurrentCalls = 1 });
}
if (exceptions.Count == this._subscriptionClients.Count)
{
throw new AggregateException(exceptions);
}
await Task.CompletedTask.ConfigureAwait(false);
}
public async Task<string> SendAsync(string value)
{
var body = Encoding.UTF8.GetBytes(value);
var message = new Message(body) { MessageId = Guid.NewGuid().ToString() };
var exceptions = new ConcurrentQueue<Exception>();
if (!this._topicClients.Any())
{
throw new InvalidOperationException("No TopicClient exist");
}
// Fan-out messages.
foreach (var client in this._topicClients)
{
try
{
await client.SendAsync(message.Clone());
}
catch (Exception ex)
{
exceptions.Enqueue(ex);
}
}
// Throw the exception if all clients fail sending the message.
if (exceptions.Count == this._topicClients.Count)
{
throw new AggregateException(exceptions);
}
return message.MessageId;
}
public IMessageService WithSubscriptionClients()
{
this._subscriptionClients.Clear();
foreach (var kvp in this._settings.ConnectionStrings)
{
var client = new SubscriptionClient(
kvp.Value,
this._settings.Topic.Name,
this._settings.Topic.Subscription.Name,
ReceiveMode.PeekLock);
this._subscriptionClients.Add(client);
}
return this;
}
public IMessageService WithTopicClients()
{
this._topicClients.Clear();
foreach (var kvp in this._settings.ConnectionStrings)
{
var client = new TopicClient(kvp.Value, this._settings.Topic.Name);
this._topicClients.Add(client);
}
return this;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment