Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save jsquire/fee9e4093499852a5b8f218fb41f995e to your computer and use it in GitHub Desktop.
Save jsquire/fee9e4093499852a5b8f218fb41f995e to your computer and use it in GitHub Desktop.

.NET Event Hubs Client: Streaming Receive Concept (Second Preview)

When receiving events, Event Hubs typically offers two different conceptual approaches: requesting a batch of events or receiving events as they are published, whether by pull (iterating against a generator) or push (callback handler). The approach used for implementation varies by language, with the track one .NET client opting for push-via-callback.

Concepts to Consider for .NET in Track 2

IAsnyc Enumerable

var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer("$Default", "0", EventPosition.Earliest, options);

// No explicit start/stop
var maximumWaitTime = TimeSpan.FromSeconds(1); 

await foreach (var event in consumer.SubscribeToPartition(maximumWaitTime))
{
    ...
}
  • Developers are in control over when they start/stop receiving.

  • If no events are available, we will yield a null after the max wait time.

  • It would feel more natural as a property, but that would make passing a max wait time less clear.

  • The consumer is in control over processing speed; there's no need to consider back pressure. (the consumer manages a prefetch queue already, so it will won't just pull indiscriminately.)

  • Multiple subscribers will require an individual channel for events pulled from the partition; trying to use a single source and make them competing consumers would be counter-intuitive and weird.

Subscribe Pattern

var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer("$Default", "0", EventPosition.Earliest);

Func<EventData> processEvent = eventData => { ... };

// Subscribing returns an IDisposable; when disposed, it unsubcribes.
using (var subscription = consumer.SubcribeToEvents(processEvent))
{
    ...
}
  • Doesn't feel super .NET-like to me. It doesn't particularly seem to cut against the grain, but it feels a bit off.

  • The subscription is disposable, so there is deterministic control over the lifespan.

  • Feels like a bit of a bastardization of IDisposable. We could return something that has a Stop method or similar, but my intent was to make this clear that it wasn't intended to be stop/start.

  • Maybe we need to think about start/stop to allow the consumer to apply back pressure if processing can't keep up with the event stream - or will this just use the Prefetch Count?

Delegate with Cancellation

var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer(EventHubConsumer.DefaultConsumerGroupName, "0", EventPosition.Earliest);

var cancellationSource = new CancellationTokenSource();
Func<EventData> processEvent = eventData => { ... };

// Begin receving events
consumer.SubcribeToEvents(processEvent, cancellationSource.Token);

// Stop receiving events
cancellationSource.Cancel();
  • Seems to be clean and makes what is happening obvious.

  • Aligns with the cancellation token approach used in other public methods.

  • Does not allow for interim start/stop semantics.

  • Does not lend itself well to obvious back pressure if the consumer cannot keep up with the processing - will this just use the Prefetch Count?

Reactive Extensions

var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer(EventHubConsumer.DefaultConsumerGroupName, "0", EventPosition.Earliest);

Func<EventData> processEvent = eventData => { ... };
Func<Exception> processException = exception => { ... };

var maximumWaitTime = TimeSpan.FromSeconds(2);

using (var subscription = consumer.PartitionEvents.Subscribe(processEvent, processException))
{
    ...
}
  • Introduces an external dependency that we are unlikely to want to take.

  • Powerful, but not well-known to many .NET developers. (I'm rusty at it, personally. The snippet may not be completely correct and doesn't show off any of the fancy stuff, but I figure we're unlikely to go down this road, so I'm being lazy about it.)

  • Allows throttling and other means of providing flow control so that we don't have to consider back pressure.

What did .NET do in Track 1?

// There is no default implementation; you must create a custom class
public class ReceiveHandler : IPartitionReceiveHandler
{
    public int MaxBatchSize { get; set; } = 50;
    
    public Task ProcessEventAsny(IEnumerable<EventData> eventBatch) =>
        Task.CompletedTask;
    
    public Task ProcessErrorAsync(Exception error) =>
        Task.CompletedTask;
}

var client = EventHubClient.CreateFromConnectionString("<< Connection String >>");
var receiver = client.CreateReceiver("$Default", "0", EventPosition.FromStart());

// Begin receiving events; only a single subscriber may be registered.  If
// another is set, the current one stops recieving with no notice.
receiver.SetReceiveHandler(new ReceiveHandler());

// Stop receiving events
receiver.SetReceiveHandler(null);

What do the other languages do in Track 2?

Java

EventHubClient client = new EventHubClientBuilder()
    .connectionString(connectionString)
    .buildAsyncClient();
            
EventHubConsumer consumer = client.createConsumer(
    EventHubClient.DEFAULT_CONSUMER_GROUP_NAME, 
    firstPartition, 
    EventPosition.latest());

// Begin receiving events
Disposable subscription = consumer.receive().subscribe(event -> { ... });

// Stop receiving events
subscription.dispose();

TypeScript (subscription)

const client = new EventHubClient(connectionString, eventHubName);
const consumer = client.createConsumer("$Default", "0", EventPosition.earliest());
 
const onMessageHandler: OnMessage = (brokeredMessage: EventData) => { ... };
const onErrorHandler: OnError = (err: MessagingError | Error) => { ... };

// Begin receiving events
const handler = consumer.receive(onMessageHandler, onErrorHandler);

// Stop receiving events
await handler.stop();

TypeScript (iterator)

const client = new EventHubClient(connectionString, eventHubName);
const consumer = client.createConsumer("$Default", "0", EventPosition.earliest());

// No explicit start/stop
for await (const events of consumer.getEventIterator()){ ... }

Python

client = EventHubClient(
    host=HOSTNAME, 
    event_hub_path=EVENT_HUB, 
    credential=EventHubSharedKeyCredential(USER, KEY),
    network_tracing=False)
    
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION)

# Receiving is done via iterator on the consumer itself; no explicit start/stop
for item in consumer:
    print(item)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment