Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active April 26, 2024 19:41
Show Gist options
  • Save jsquire/35b77c75608dea4b9fa266969c448369 to your computer and use it in GitHub Desktop.
Save jsquire/35b77c75608dea4b9fa266969c448369 to your computer and use it in GitHub Desktop.
Microsoft Messaging: AMQP client library (early concept)

Microsoft Messaging: AMQP client library (early concept)

The high-level goal for the Microsoft Messaging client library is to provides a unified experience for core messaging scenarios needed by developers wishing to interact with Microsoft Fabric, Azure Event Grid, Azure Service Bus, and Azure Event Hubs.

Things to know before reading

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level messaging operations are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the messaging types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Table of contents

Why this is needed

In order to stay competitive in the messaging space, the Azure Messaging services are pivoting to reposition Event Grid as a general purpose publish/subscribe experience for core messaging scenarios. The intent is for developers who do not have specific needs to opt for Event Grid by default and then "grow up" to Service Bus and Event Hubs for more advanced scenarios.

Event Grid also serves as the messaging interface for Microsoft Fabric, providing the default experience for pushing data into the service via AMQP and MQTT. In this form, it has no unique branding and is intended to be viewed by developers as just another part of Microsoft Fabric.

In the past, each client library developed by the Azure SDK team was branded and designed for an individual Azure service. Because the Messaging needs have a unique scenario, the desire is to have a client library that supports the AMQP operations exposed by Event Grid while not being directly bound to that brand. The client should be able to work against any Microsoft messaging service that supports that service API - specifically covering Microsoft Fabric, Azure Event Grid, Azure Event Hubs, and Azure Service Bus.

Goals

  • Provide a "universal messaging" library that does not make reference to "Azure", "Event Grid", "Service Bus", or "Event Hubs" in its package or type names. The goal is to provide an API surface for "Microsoft Messaging" based on the service operations exposed by Event Grid. (Note: this is not intended as a full "unbranded" experience. see: Non-Goals)

  • Support targeting Microsoft Fabric, Azure Event Grid, Azure Service Bus, and Azure Event Hubs for common service operations by changing just the endpoint or connection string.

  • Automatically discover the needed authorization scope for the service target and adjust without additional configuration or hints from the application using the SDK package.

  • Provide a client hierarchy for interacting with Azure Messaging services via the AMQP protocol.

Non-Goals

  • Providing a full "unbranded" experience. It is understood and accepted that the library will have Azure-branded dependencies, "Azure" will appear in stack traces and other non-focal areas, and code will live in the Azure SDK repositories.

  • Support for protocols other than AMQP.

  • Supporting service operations which are not supported by Event Grid.

  • Supporting Event Grid operations which are not supported by Service Bus and Event Hubs.

Assumptions and constraints

  • The Azure Event Grid AMQP API is a super set of functionality; all queue-based operations are supported by Azure Service Bus and all stream-based operations are supported by Azure Event Hubs.

  • Microsoft Fabric, Azure Service Bus, and Azure Event Hubs may provide additional AMQP operations that are not supported by Azure Event Grid. These will not be available in the Microsoft Messaging client library.

  • All services that the client library can target must have the same AMQP API surface for the supported operations.

  • Services that the client library can target may have different authorization scopes. If they do, they must support a common interface for discovering the correct scope to use when an invalid scope causes an auth failure. (the exact mechanism is not yet designed)

Nomenclature

  • Namespace: The top-level container for an Azure Messaging service. Though namespaces are the target of the service endpoint and connections are opened against them, they expose no operations directly to callers.

  • Topic: The entity of interest; a topic lives under a namespace and is the top-level target for operations. This name was chosen because of prior art in the messaging space, where it is used by Event Grid, Service Bus, and Kafka. This does cause a bit of friction for Service Bus, which separates entities into "queues" and "topics" and for Event Hubs which refers to them as an "Event Hub."

  • Subscription: An optional entity that can be specified as the target for reads. This is required when reading an Event Grid topic or a Service Bus topic. It is not applicable when reading from a Service Bus queue. When using stream semantics, this would correlate to an Event Hubs consumer group.

  • CloudEvent: A data representation based on the CloudEvents specification. All messaging data in the client library will be modeled as CloudEvent and use the CloudEvent AMQP binding as the data format for service operations.

Package naming

Because the client library is intended to be associated with Microsoft but not with Azure nor a specific product, a Microsoft. name has been requested by the Azure Messaging team. Taking into account that the library will be used in the messaging domain and works exclusively with CloudEvent data, a reasonable proposal may be a variant of one of the following:

  • Microsoft.Messaging
  • Microsoft.Messaging.CloudEvents
  • Microsoft.CloudEvents.Messaging
  • Microsoft.Messaging.Amqp
  • Microsoft.Messaging.PubSub

Client hierarchy

  • MessagingTopicClient: The top-level client for interacting with a given topic of an messaging service namespace. It represents a single AMQP connection shared by any sub-clients spawned from it.

  • QueueCloudEventReceiver: A sub-client allowing events from a specific subscription of a topic to be read using a pull-based approach with queue semantics.

  • QueueCloudEventProcessor: A sub-client allowing events from a specific subscription of a topic to be read using a push-based approach with queue semantics.

  • StreamCloudEventReceiver: A sub-client allowing events from a specific subscription of a topic to be read using a pull-based approach with stream semantics.

  • StreamCloudEventProcessor: A sub-client allowing events from a specific subscription of a topic to be read using a push-based approach with stream semantics.

API Skeleton

Please see the API View.

Usage examples

Creating the client

The MessagingTopicClient supports the standard set of constructors for token credentials, and key credentials. Because it maintains a stateful connection, it implements IAsyncDisposable and is expected to be disposed when no longer in use.

Create using a key credential

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var credential = new AzureKeyCredential("<< KEY VALUE >>");

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);

Create using an Azure.Identity credential

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);

Create with optional configuration

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

var options = new MessagingTopicClientOptions
{
    TransportType = EventGridTransportType.AmqpWebsockets,
    Identifier = "Squire-Node-11",
    ConnectionIdleTimeout = TimeSpan.FromSeconds(90)
};

await using var client = new MessagingTopicClient(
    endpoint, 
    topciName,
    credential,
    options);

Publishing events

All events are published in the CloudEvent format, with access to the full AMQP message available for advanced scenarios. Publishing is done using the MessagingTopicClient and follows the existing patterns established for the Azure messaging services.

Publish a single event

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);

var cloudEvent = new CloudEvent(
  source: "some-source",
  type: "com.microsoft.squire-example",
  data: BinaryData.FromBytes(anew byte[] { 0x1, 0x2 }),
  format: CloudEventDataFormat.Binary);
  
await client.PublishCloudEventAsync(cloudEvent);

Publish a set of events

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);

await client.PublishCloudEventsAsync(new[] 
{
    new CloudEvent("some-source", "some-type",  "data!"),
    new CloudEvent("some-source", "other-type", "moardata!"))
});

Publish with explicit batching

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);
using var batch = await client.CreateBatchAsync();

while (TryGetNextCloudEvent(..., out var currentEvent))
{
    if (!batch.TryAdd(currentEvent))
    {
        // If there are events in the batch, then the batch is full.  Publish
        // it and create a new batch for the event that wouldn't fit.
        if (batch.Count > 0)
        {
            await client.PublishCloudEventsAsync(batch);
            
            batch.Dispose();
            batch = await client.CreateBatchAsync();
            
            // There are no events in the batch, if the current event
            // does not fit, then it is too large to ever be published.
            if (!batch.TryAdd(currentEvent))
            {
                throw new Exception("There was an event too large to fit into any batch.");
            }
        }
    }
}
    
// If there are any events in the batch, publish a partial
// batch, as we have no more events.
if (batch.Count > 0)
{
   await client.PublishCloudEventsAsync(batch);
}

Reading events with queue semantics

The patterns for queue-based operations are based on those in the Service Bus library for the receiver and processor, using the terminology preferred by the Event Grid service.

Read single events with a pull-based reader

using var cancellationToken = GetCancellationToken(...);

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topicName, credential);
await using var reader = client.CreateQueueReceiver(subscriptionName)

while (true)
{
    var cloudEvent = reader.ReceiveCloudEventAsync(cancellationToken);
    
    if (cloudEvent is not null)
    {
        await ProcessEventAsync(cloudEvent, cancellationToken, ...);
        await reader.AcknowledgeCloudEventAsync(cloudEvent);
    }
    
    cancellationToken.ThrowIfCancellationRequested();
}

Read batches with a pull-based reader

using var cancellationToken = GetCancellationToken(...);

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);
await using var reader = client.CreateQueueReceiver(subscriptionName);

while (true)
{
    var eventBatch = reader.ReceiveCloudEventsAsync(maxEvents: 10, cancellationToken);
    await ProcessAndAcknowledgeBatchAsync(eventBatch, cancellationToken, ...);
    
    cancellationToken.ThrowIfCancellationRequested();
}

Read with a single dispatch processor

using var cancellationToken = GetCancellationToken(...);

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

var options = new QueueCloudEventProcessorOptions
{
    AutoAcknowledgeMessages = false,
    MaxConcurrentCalls = 2
};

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);
await using var processor = client.CreateQueueProcessor(subscriptionName, options);

processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task ProcessEventHandler(ProcessCloudEventQueueArgs args)
{
    await ProcessEventAsync(args.CloudEvent, args.CancellationToken, ...);
    await args.AcknowledgeCloudEventAsync(args.CloudEvent);
}

Task ErrorHandler(ProcessErrorEventArgs args) =>
    LogProcessorError(
        args.FullyQualifiedNamespace,
        args.TopicName,
        args.SubscriptionName,
        args.ErrorSource,
        args.Exception.ToString());
        
await processor.StartProcessingAsync();

try
{
    await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
    // Expected
}
finally
{
    await processor.StopProcessingAsync();
    
    processor.ProcessEventAsync -= ProcessEventHandler;
    processor.ProcessErrorAsync -= ErrorHandler;
}

Reading events with stream semantics

At the time of writing, the Event Grid service design for streams has not been completed. The proposed client surface is based on Event Hubs patterns and expected Event Grid stream operations. The terminology for concepts is that preferred by the Event Grid service.

Read event batches with a pull-based reader

using var cancellationToken = GetCancellationToken(...);

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);
await using var reader = client.CreateStreamReceiver(subscriptionName, EventStreamPosition.Earliest);

while (true)
{
    var eventBatch = reader.ReceiveCloudEventsAsync(
        maxBatchSize: 100,
        maxWaitTime: TimeSpan.FromSeconds(15), 
        cancellationToken);
    
    foreach (var cloudEvent in eventBatch)
    {
        await ProcessEventAsync(ecloudEvent, cancellationToken, ...);
    }
    
    cancellationToken.ThrowIfCancellationRequested();
}

Read with a batch processor

using var cancellationToken = GetCancellationToken(...);

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var storageClient = new BlobContainerClient(
    storageConnectionString,
    blobContainerName);

// This concrete checkpoint store is just an example; due to the
// dependency on Azure Storage, it would require a separate package.
var checkpointStore = new BlobStreamCloudEventProcessorCheckpointStore(storageClient);

var fullyQualifiedNamespace = "<< Fully Qualified Namespace >>";
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

var options = new StreamCloudEventProcessorOptions
{
    MaxBatchSize = 100,
    PrefetchCount = 300,
    MaxWaitTime = TimeSpan.FromSeconds(30),
    MaxConcurrentBatches = 5
};

await using var client = new MessagingTopicClient(fullyQualifiedNamespace, topciName, credential);
await using var processor = client.CreateStreamProcessor(
    checkpointStore,
    subscriptionName, 
    EventStreamPosition.FromSequenceNumber(1234),
    options);

processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task ProcessEventHandler(ProcessCloudEventStreamArgs args)
{
    CloudEvent lastEvent;
    
    foreach (var cloudEvent in args.CloudEvents)
    {
        await ProcessEventAsync(args.CloudEvent, args.CancellationToken, ...);
        lastEvent = args.CloudEvent;
    }
    
    await args.UpdateCheckpoint(lastEvent);
}

Task ErrorHandler(ProcessErrorEventArgs args) =>
    LogProcessorError(
        args.FullyQualifiedNamespace,
        args.TopicName,
        args.SubscriptionName,
        args.ErrorSource,
        args.Exception.ToString());
        
await processor.StartProcessingAsync();

try
{
    await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
    // Expected
}
finally
{
    await processor.StopProcessingAsync();
    
    processor.ProcessEventAsync -= ProcessEventHandler;
    processor.ProcessErrorAsync -= ErrorHandler;
}

Open questions

  • The proposed client hierarchy does not currently support sharing of connections between topics; each topic is assumed to have a unique connection. Based on developer feedback from our Event Hubs and Service Bus libraries, connection sharing is a niche feature.

    Should we consider an initial design that introduces a connection type that can be used at construction (as our Event Hubs clients do) or that we should shift to a namespace-level client and introduce a dedicated publisher type (as our Service Bus clients do)?

  • Can any meaningful service operation be performed at the namespace level, or is every operation bound to a specific topic?

  • What are the intended service champion scenarios for queue versus stream semantics? These examples assume that queue readers are interested in deterministic processing with explicit control and stream readers are most interested in throughput.

  • Will checkpoints for streams be external like Event Hubs or broker-owned like Kafka? These examples assume external, despite strong customer feedback for broker-owned checkpoints.

  • Each service uses different auth scopes. How will the client understand what scope is needed when authorization fails while pointed at Event Hubs or Service Bus?

  • Given that partitions in Event Grid are an implicit concept and they are both explicit for Event Hubs and very important, what is the approach the service(s) will take for allowing Event Grid-focused stream readers to work against Event Hubs?

  • The format of Cloud Event instances for AMQP has some ambiguity in the AMQP protocol binding spec spec. After discussion with Clemens, I believe that he suggested that the following scenarios are those indicated by the spec:

    • A binary Cloud Event uses an AMQP data body (see: 3.1.2)
    • A single structured Cloud Event uses an AMQP value body. (see: 3.2.2)
    • A batch of structured Cloud Events uses an AMQP sequence body. (via Clemens)

    Looking at the official CloudNative.CloudEvents implementation, it does not agree with Clemens' statements. The Cloud Native implementation uses a data-bodied message for both binary and structured mode. We're unsure which is correct.

  • The CloudEvents spec does not mention a batch of binary Cloud Events, it is unknown if this is - or should be - a supported scenario for Event Grid. Given that interoperability with Service Bus is a goal, batch support would be highly desirable.

  • If my understanding of Clemens' interpretation is correct, there is an interoperability concern that doesn't exist when only data bodies are used. The Service Bus and Event Hubs SDK packages assume a data-bodied AMQP message and expose only that in the ServiceBusMessage and EventData abstractions. Reading/setting a value or sequence body requires accessing the raw AMQP message and interacting with it directly. Developers publishing events to Service Bus or Event Hubs from the Microsoft Messaging SDK package and reading from the Service Bus or Event Hubs SDK packages would have a poor experience.

  • What are intended service champion scenarios for binary versus structured mode for CloudEvents? If the Microsoft Messaging readers can consume both formats and transform to the CloudEvent model, is there justification to allow publishers to choose a specific format rather than having the client control that decision?

  • What experience do the Microsoft Messaging readers offer for messages/events published by the Service Bus or Event Hubs SDK? These are arbitrary data and not in CloudEvent format. What translation can/should take place? It also seems to necessitate that developers have access to the raw AMQP message for advanced scenarios.

  • If a developer drops down to the raw AMQP message and manipulates it in a way that it includes data beyond what is covered in the CloudEvent AMQP binding, what is the service behavior? Should the Microsoft Messaging library disallow direct AMQP message interactions entirely?

  • What is the error experience for the processor types? Does this follow the Event Hubs model of "don't invoke the error handler for developer code errors" which lets the processing task fault or the Service Bus model of "invoke the error handler for developer code errors and protect against the processing task faulting?"

  • Because Event Grid's HTTP API uses "Publish/Receive" as the terminology for operations, those conventions were used for the types defined herein. Should we snap to existing nomenclature and, if so, which? Both Service Bus and Even Hubs use "Send/Receive" terminology for operation names, though Service Bus uses "Sender/Receiver" for types where Event Hubs uses "Producer/Consumer."

References and resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment