Skip to content

Instantly share code, notes, and snippets.

@jsquire
Last active April 11, 2024 21:09
Show Gist options
  • Save jsquire/c815e47dd87bb4a853c00cdac3a3fb68 to your computer and use it in GitHub Desktop.
Save jsquire/c815e47dd87bb4a853c00cdac3a3fb68 to your computer and use it in GitHub Desktop.
Event Grid: AMQP (early concept)

Event Grid: AMQP (early concept)

Things to know before reading

  • The names used in this document are intended for illustration only. Some names are not ideal and will need to be refined during discussions.

  • Some details not related to the high-level concept are not illustrated; the scope of this is limited to the high level shape and paradigms for the feature area.

  • Fake methods are used to illustrate "something needs to happen, but the details are unimportant." As a general rule, if an operation is not directly related to one of the Event Grid types, it can likely be assumed that it is for illustration only. These methods will most often use ellipses for the parameter list, in order to help differentiate them.

Table of contents

Client hierarchy

  • EventGridTopicClient: The top-level client for interacting with a given topic of an Event Grid namespace. It represents a single AMQP connection shared by any sub-clients spawned from it.

  • EventGridQueueReader: A sub-client allowing events from a specific subscription of a topic to be read using a pull-based approach with queue semantics.

  • EventGridQueueProcessor: A sub-client allowing events from a specific subscription of a topic to be read using a push-based approach with queue semantics.

  • EventGridStreamReader: A sub-client allowing events from a specific subscription of a topic to be read using a pull-based approach with stream semantics.

  • EventGridStreamProcessor: A sub-client allowing events from a specific subscription of a topic to be read using a push-based approach with stream semantics.

Open questions

  • Can any service operation be performed at the namespace level, or is every operation bound to a specific topic?

  • What are the intended use cases for queue versus stream semantics? These examples assume that queue readers are interested in deterministic processing with explicit control and stream readers are most interested in throughput.

  • Will checkpoints for streams be external like Event Hubs or broker-owned like Kafka? These examples assume external, despite strong customer feedback for broker-owned checkpoints.

Usage examples

Creating the client

The EventGridTopicClient supports the standard set of constructors for token credentials, and key credentials. Because it maintains a stateful connection, it implements IAsyncDisposable and is expected to be disposed when no longer in use.

Create using a key credential

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new AzureKeyCredential("<< KEY VALUE >>");

await using var client = new EventGridTopicClient(endpoint, topciName, credential);

Create using an Azure.Identity credential

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);

Create with optional configuration

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

var options = new EventGridTopicClientOptions
{
    TransportType = EventGridTransportType.AmqpWebsockets,
    Identifier = "Squire-Node-11",
    ConnectionIdleTimeout = TimeSpan.FromSeconds(90)
};

await using var client = new EventGridTopicClient(
    endpoint, 
    topciName,
    credential,
    options);

Publishing events

All events are published in the CloudEvent format, with access to the full AMQP message available for advanced scenarios. Publishing is done using the EventGridTopicClient and follows the existing patterns established for the Azure messaging services.

Publish a single event

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);

var cloudEvent = new CloudEvent(
  source: "some-source",
  type: "com.microsoft.squire-example",
  data: BinaryDat.FromBytes(anew byte[] { 0x1, 0x2 }),
  format: CloudEventDataFormat.Binary);
  
await client.PublishCloudEventAsync(cloudEvent);

Publish a set of events

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);

await client.PublishCloudEventsAsync(new[] 
{
    new CloudEvent("some-source", "some-type",  "data!"),
    new CloudEvent("some-source", "other-type", "moardata!"))
});

Publish with explicit batching

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);
using var batch = await client.CreateBatchAsync();

while (TryGetNextCloudEvent(..., out var currentEvent))
{
    if (!batch.TryAdd(currentEvent))
    {
        // If there are events in the batch, then the batch is full.  Publish
        // it and create a new batch for the event that wouldn't fit.
        if (batch.Count > 0)
        {
            await client.PublishCloudEventsAsync(batch);
            
            batch.Dispose();
            batch = await client.CreateBatchAsync();
            
            // There are no events in the batch, if the current event
            // does not fit, then it is too large to ever be published.
            if (!batch.TryAdd(currentEvent))
            {
                throw new Exception("There was an event too large to fit into any batch.");
            }
        }
    }
}
    
// If there are any events in the batch, publish a partial
// batch, as we have no more events.
if (batch.Count > 0)
{
   await client.PublishCloudEventsAsync(batch);
}

Reading events with queue semantics

Read single events with a pull-based reader

using var cancellationToken = GetCancellationToken(...);

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var reader = client.CreateQueueReader(subscriptionName)

while (true)
{
    var cloudEvent = reader.ReadCloudEventAsync(cancellationToken);
    
    if (cloudEvent is not null)
    {
        await ProcessEventAsync(cloudEvent, cancellationToken, ...);
        await reader.AcknowledgeCloudEventAsync(cloudEvent);
    }
    
    cancellationToken.ThrowIfCancellationRequested();
}

Read batches with a pull-based reader

using var cancellationToken = GetCancellationToken(...);

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var reader = client.CreateQueueReader(subscriptionName);

while (true)
{
    var eventBatch = reader.ReadCloudEventsAsync(maxEvents: 10, cancellationToken);\
    await ProcessAndAcknowledgeBatchAsync(eventBatch, cancellationToken, ...);
    
    cancellationToken.ThrowIfCancellationRequested();
}

Read with a single dispatch processor

using var cancellationToken = GetCancellationToken(...);

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

var options = new EventGridQueueProcessorOptions
{
    AutoAcknowledgeMessages = false,
    MaxConcurrentCalls = 2
};

await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var processor = client.CreateQueueProcessor(subscriptionName, options);

processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task ProcessEventHandler(ProcessCloudEventQueueArgs args)
{
    await ProcessEventAsync(args.CloudEvent, args.CancellationToken, ...);
    await args.AcknowledgeCloudEventAsync(args.CloudEvent);
}

Task ErrorHandler(ProcessErrorEventArgs args) =>
    LogProcessorError(
        args.FullyQualifiedNamespace,
        args.TopicName,
        args.SubscriptionName,
        args.ErrorSource,
        args.Exception.ToString());
        
await processor.StartProcessingAsync();

try
{
    await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
    // Expected
}
finally
{
    await processor.StopProcessingAsync();
}

Reading events with stream semantics

At the time of writing, the Event Grid service design for streams has not been completed. The proposed client surface is based on Event Hubs patterns and expected Event Grid stream operations.

Read event batches with a pull-based reader

using var cancellationToken = GetCancellationToken(...);

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var reader = client.CreateStreamReader(subscriptionName, EventStreamPosition.Earliest);

while (true)
{
    var eventBatch = reader.ReadCloudEventsAsync(
        maxBatchSize: 100,
        maxWaitTime: TimeSpan.FromSeconds(15), 
        cancellationToken);
    
    foreach (var cloudEvent in eventBatch)
    {
        await ProcessEventAsync(ecloudEvent, cancellationToken, ...);
    }
    
    cancellationToken.ThrowIfCancellationRequested();
}

Read with a batch processor

using var cancellationToken = GetCancellationToken(...);

var storageConnectionString = "<< CONNECTION STRING FOR THE STORAGE ACCOUNT >>";
var blobContainerName = "<< NAME OF THE BLOB CONTAINER >>";

var storageClient = new BlobContainerClient(
    storageConnectionString,
    blobContainerName);
    
var checkpointStore = new BlobCheckpointStore(storageClient);

var endpoint = new Uri("<< SOME NAMESPACE URI >>");
var topicName = "<< SOME TOPIC >>";
var subscriptionName = "<< SOME SUBSCRIPTION >>";
var credential = new DefaultAzureCredential();

var options = new EventGridQueueProcessorOptions
{
    MaxBatchSize = 100,
    PrefetchCount = 300,
    MaxWaitTime = TimeSpan.FromSeconds(30),
    MaxConcurrentBatches = 5
};

await using var client = new EventGridTopicClient(endpoint, topciName, credential);
await using var processor = client.CreateStreamProcessor(
    checkpointStore,
    subscriptionName, 
    EventStreamPosition.FromSequenceNumber(1234),
    options);

processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ErrorHandler;

async Task ProcessEventHandler(ProcessCloudEventStreamArgs args)
{
    CloudEvent lastEvent;
    
    foreach (var cloudEvent in args.CloudEvents)
    {
        await ProcessEventAsync(args.CloudEvent, args.CancellationToken, ...);
        lastEvent = args.CloudEvent;
    }
    
    await args.UpdateCheckpoint(lastEvent);
}

Task ErrorHandler(ProcessErrorEventArgs args) =>
    LogProcessorError(
        args.FullyQualifiedNamespace,
        args.TopicName,
        args.SubscriptionName,
        args.ErrorSource,
        args.Exception.ToString());
        
await processor.StartProcessingAsync();

try
{
    await Task.Delay(Timeout.Infinite, cancellationToken);
}
catch (OperationCanceledException)
{
    // Expected
}
finally
{
    await processor.StopProcessingAsync();
}

Implicit interoperability questions and concerns

  • Each service uses different auth scopes. How would the client understand what scope is needed if authorization fails when pointed at Event Hubs or Service Bus?

  • Event Grid uses uses different token types for shared keys and SAS. Will they be supported by Service Bus and Event Hubs?

  • The entity paths used by Event Grid do not match those used by Service Bus. I expect the same will be the case for Event Hubs.

  • Given that partitions in Event Grid are an implicit concept and they are both explicit for Event Hubs and very important, I do not see a way that Event Grid stream readers could work against Event Hubs without new service features.

  • Will Event Grid be using the Service Bus names for management link-based operations and the same structures for request/response? (ref)

  • Will Event Grid be supporting the full AMQP message? If so, will that include the vendor-specific application property types for TimeSpan, URI, and DateTimeOffset? (ref)

  • Will Event Grid be supporting the WebSocket prefix used by Service Bus and Event Hubs? (ref)

  • Will Event Grid be supporting batch-based message settlements? If so, how would that translate to Service Bus?

References and resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment