When receiving events, Event Hubs typically offers two different conceptual approaches: requesting a batch of events or receiving events as they are published, whether by pull (iterating against a generator) or push (callback handler). The approach used for implementation varies by language, with the track one .NET client opting for push-via-callback.
var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer("$Default", "0", EventPosition.Earliest, options);
// No explicit start/stop
var maximumWaitTime = TimeSpan.FromSeconds(1);
await foreach (var event in consumer.SubscribeToPartition(maximumWaitTime))
{
...
}
-
Developers are in control over when they start/stop receiving.
-
If no events are available, we will yield a
null
after the max wait time. -
It would feel more natural as a property, but that would make passing a max wait time less clear.
-
The consumer is in control over processing speed; there's no need to consider back pressure. (the consumer manages a prefetch queue already, so it will won't just pull indiscriminately.)
-
Multiple subscribers will require an individual channel for events pulled from the partition; trying to use a single source and make them competing consumers would be counter-intuitive and weird.
var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer("$Default", "0", EventPosition.Earliest);
Func<EventData> processEvent = eventData => { ... };
// Subscribing returns an IDisposable; when disposed, it unsubcribes.
using (var subscription = consumer.SubcribeToEvents(processEvent))
{
...
}
-
Doesn't feel super .NET-like to me. It doesn't particularly seem to cut against the grain, but it feels a bit off.
-
The subscription is disposable, so there is deterministic control over the lifespan.
-
Feels like a bit of a bastardization of
IDisposable
. We could return something that has aStop
method or similar, but my intent was to make this clear that it wasn't intended to be stop/start. -
Maybe we need to think about start/stop to allow the consumer to apply back pressure if processing can't keep up with the event stream - or will this just use the Prefetch Count?
var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer(EventHubConsumer.DefaultConsumerGroupName, "0", EventPosition.Earliest);
var cancellationSource = new CancellationTokenSource();
Func<EventData> processEvent = eventData => { ... };
// Begin receving events
consumer.SubcribeToEvents(processEvent, cancellationSource.Token);
// Stop receiving events
cancellationSource.Cancel();
-
Seems to be clean and makes what is happening obvious.
-
Aligns with the cancellation token approach used in other public methods.
-
Does not allow for interim start/stop semantics.
-
Does not lend itself well to obvious back pressure if the consumer cannot keep up with the processing - will this just use the Prefetch Count?
var client = new EventHubClient("<< Connection String >>", "SomeHub");
var consumer = client.CreateConsumer(EventHubConsumer.DefaultConsumerGroupName, "0", EventPosition.Earliest);
Func<EventData> processEvent = eventData => { ... };
Func<Exception> processException = exception => { ... };
var maximumWaitTime = TimeSpan.FromSeconds(2);
using (var subscription = consumer.PartitionEvents.Subscribe(processEvent, processException))
{
...
}
-
Introduces an external dependency that we are unlikely to want to take.
-
Powerful, but not well-known to many .NET developers. (I'm rusty at it, personally. The snippet may not be completely correct and doesn't show off any of the fancy stuff, but I figure we're unlikely to go down this road, so I'm being lazy about it.)
-
Allows throttling and other means of providing flow control so that we don't have to consider back pressure.
// There is no default implementation; you must create a custom class
public class ReceiveHandler : IPartitionReceiveHandler
{
public int MaxBatchSize { get; set; } = 50;
public Task ProcessEventAsny(IEnumerable<EventData> eventBatch) =>
Task.CompletedTask;
public Task ProcessErrorAsync(Exception error) =>
Task.CompletedTask;
}
var client = EventHubClient.CreateFromConnectionString("<< Connection String >>");
var receiver = client.CreateReceiver("$Default", "0", EventPosition.FromStart());
// Begin receiving events; only a single subscriber may be registered. If
// another is set, the current one stops recieving with no notice.
receiver.SetReceiveHandler(new ReceiveHandler());
// Stop receiving events
receiver.SetReceiveHandler(null);
EventHubClient client = new EventHubClientBuilder()
.connectionString(connectionString)
.buildAsyncClient();
EventHubConsumer consumer = client.createConsumer(
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
firstPartition,
EventPosition.latest());
// Begin receiving events
Disposable subscription = consumer.receive().subscribe(event -> { ... });
// Stop receiving events
subscription.dispose();
const client = new EventHubClient(connectionString, eventHubName);
const consumer = client.createConsumer("$Default", "0", EventPosition.earliest());
const onMessageHandler: OnMessage = (brokeredMessage: EventData) => { ... };
const onErrorHandler: OnError = (err: MessagingError | Error) => { ... };
// Begin receiving events
const handler = consumer.receive(onMessageHandler, onErrorHandler);
// Stop receiving events
await handler.stop();
const client = new EventHubClient(connectionString, eventHubName);
const consumer = client.createConsumer("$Default", "0", EventPosition.earliest());
// No explicit start/stop
for await (const events of consumer.getEventIterator()){ ... }
client = EventHubClient(
host=HOSTNAME,
event_hub_path=EVENT_HUB,
credential=EventHubSharedKeyCredential(USER, KEY),
network_tracing=False)
consumer = client.create_consumer(consumer_group="$default", partition_id="0", event_position=EVENT_POSITION)
# Receiving is done via iterator on the consumer itself; no explicit start/stop
for item in consumer:
print(item)