Last active
October 28, 2019 09:42
Handling Messages with Geo-Redundant Azure Service Bus via Azure Functions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AzureServiceBusSettings | |
{ | |
public virtual Dictionary<string, string> ConnectionStrings { get; set; } | |
public virtual AzureServiceBusTopicSettings Topic { get; set; } | |
} | |
public class AzureServiceBusTopicSettings | |
{ | |
public virtual string Name { get; set; } | |
public virtual AzureServiceBusTopicSubscriptionSettings Subscription { get; set; } | |
} | |
public class AzureServiceBusTopicSubscriptionSettings | |
{ | |
public virtual string Name { get; set; } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AppSettings : AppSettingsBase | |
{ | |
private const string ServiceBusSettingsKey = "AzureServiceBus"; | |
public AppSettings() | |
{ | |
// Get the strongly-typed app settings instance. | |
this.ServiceBus = this.Config.Get<AzureServiceBusSettings>(ServiceBusSettingsKey); | |
} | |
public virtual AzureServiceBusSettings ServiceBus { get; } | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[FunctionName(nameof(MessageReceiveHttpTrigger))] | |
public async Task<IActionResult> Run( | |
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/receive")] HttpRequest req, | |
ILogger log) | |
{ | |
log.LogInformation("C# HTTP trigger function processed a request."); | |
await this._service | |
.WithSubscriptionClients() | |
.ReceiveAsync(async (client, message) => | |
{ | |
log.LogInformation($"Processed: {message.MessageId} at {client.ServiceBusConnection.Endpoint}"); | |
await Task.CompletedTask.ConfigureAwait(false); | |
}) | |
.ConfigureAwait(false); | |
return new OkResult(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[FunctionName(nameof(MessageSendHttpTrigger))] | |
public async Task<IActionResult> Run( | |
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "messages/send")] HttpRequest req, | |
ILogger log) | |
{ | |
log.LogInformation("C# HTTP trigger function processed a request."); | |
var payload = new SamplePayload() { Message = "Hello World" }; | |
var result = await this._service | |
.WithTopicClients() | |
.SendAsync(payload) | |
.ConfigureAwait(false); | |
return new OkObjectResult(result); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MessageSendHttpTrigger | |
{ | |
private readonly IMessageService _service; | |
public MessageSendHttpTrigger(IMessageService service) | |
{ | |
this._service = service | |
?? throw new ArgumentNullException(nameof(service)); | |
} | |
... | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[assembly: FunctionsStartup(typeof(GeoRedundant.FunctionApp.StartUp))] | |
namespace GeoRedundant.FunctionApp | |
{ | |
public class StartUp : FunctionsStartup | |
{ | |
public override void Configure(IFunctionsHostBuilder builder) | |
{ | |
this.ConfigureAppSettings(builder.Services); | |
this.ConfigureServices(builder.Services); | |
} | |
private void ConfigureAppSettings(IServiceCollection services) | |
{ | |
services.AddSingleton<AppSettings>(); | |
} | |
private void ConfigureServices(IServiceCollection services) | |
{ | |
services.AddTransient<IMessageService, MessageService>(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"Values": { | |
... | |
"AzureServiceBus__ConnectionStrings__Primary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]", | |
"AzureServiceBus__ConnectionStrings__Secondary": "[AZURE_SEVICE_BUS_CONNECTION_STRING]", | |
"AzureServiceBus__Topic__Name": "my-topic", | |
"AzureServiceBus__Topic__Subscription__Name": "my-topic-subscription" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public async Task ReceiveAsync(Func<ISubscriptionClient, Message, Task> callbackToProcess) | |
{ | |
var messageIds = new List<string>(); | |
var msglock = new object(); | |
// Local function: Handles messages. | |
async Task onMessageReceived(ISubscriptionClient client, Message message, int maxMessageDeduplicationCount = 20) | |
{ | |
var duplicated = false; | |
lock (msglock) | |
{ | |
duplicated = messageIds.Remove(message.MessageId); | |
if (!duplicated) | |
{ | |
messageIds.Add(message.MessageId); | |
if (messageIds.Count > maxMessageDeduplicationCount) | |
{ | |
messageIds.RemoveAt(0); | |
} | |
} | |
} | |
if (!duplicated) | |
{ | |
await callbackToProcess(client, message).ConfigureAwait(false); | |
} | |
} | |
var exceptions = new ConcurrentQueue<Exception>(); | |
// Local function: Handles exceptions. | |
async Task onExceptionReceived(ExceptionReceivedEventArgs args) | |
{ | |
exceptions.Enqueue(args.Exception); | |
await Task.CompletedTask.ConfigureAwait(false); | |
} | |
if (!this._subscriptionClients.Any()) | |
{ | |
throw new InvalidOperationException("No SubscriptionClient exist"); | |
} | |
foreach (var client in this._subscriptionClients) | |
{ | |
client.RegisterMessageHandler( | |
(msg, token) => onMessageReceived(client, 1, msg), | |
new MessageHandlerOptions(onExceptionReceived) { AutoComplete = true, MaxConcurrentCalls = 1 }); | |
} | |
if (exceptions.Count == this._subscriptionClients.Count) | |
{ | |
throw new AggregateException(exceptions); | |
} | |
await Task.CompletedTask.ConfigureAwait(false); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public async Task<string> SendAsync(string value) | |
{ | |
var body = Encoding.UTF8.GetBytes(value); | |
var message = new Message(body) { MessageId = Guid.NewGuid().ToString() }; | |
var exceptions = new ConcurrentQueue<Exception>(); | |
if (!this._topicClients.Any()) | |
{ | |
throw new InvalidOperationException("No TopicClient exist"); | |
} | |
// Fan-out messages. | |
foreach (var client in this._topicClients) | |
{ | |
try | |
{ | |
await client.SendAsync(message.Clone()); | |
} | |
catch (Exception ex) | |
{ | |
exceptions.Enqueue(ex); | |
} | |
} | |
// Throw the exception if all clients fail sending the message. | |
if (exceptions.Count == this._topicClients.Count) | |
{ | |
throw new AggregateException(exceptions); | |
} | |
return message.MessageId; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IMessageService WithSubscriptionClients() | |
{ | |
this._subscriptionClients.Clear(); | |
foreach (var kvp in this._settings.ConnectionStrings) | |
{ | |
var client = new SubscriptionClient( | |
kvp.Value, | |
this._settings.Topic.Name, | |
this._settings.Topic.Subscription.Name, | |
ReceiveMode.PeekLock); | |
this._subscriptionClients.Add(client); | |
} | |
return this; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public IMessageService WithTopicClients() | |
{ | |
this._topicClients.Clear(); | |
foreach (var kvp in this._settings.ConnectionStrings) | |
{ | |
var client = new TopicClient(kvp.Value, this._settings.Topic.Name); | |
this._topicClients.Add(client); | |
} | |
return this; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment