Initial release version (5.0.1) of Event Hubs client library supports receiving events primarily through 3 consumer clients
- EventHubConsumerClient
- EventHubAsyncConsumerClient
- EventProcessorClient
This sync client already supports receiving events in batches. So, no changes are required for the sync client.
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition, Duration maxWaitTime);
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition, Duration maxWaitTime, ReceiveOptions receiveOptions);
The async client can use Reactor APIs to receive events in batches by using bufferTimeout()
. This requires the user to
learn how to do this using Reactor. So, we may provide a convenience API instead on the async consumer as shown below.
public Flux<PartitionEvent> receive();
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent);
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions);
// Demonstrates a convenient batch API on the client
public Flux<List<PartitionEvent>> receiveBatch(int maxBatchSize, Duration maxWaitTime) {
this.receive().bufferTimeout(maxBatchSize, maxWaitTime);
}
With the Flux
return type in existing receive
methods batching can be accomplished by the user as shown below. So, adding a receiveBatch() method on the client is a trade-off between increasing the number to of APIs and expecting the user to look for samples and/or learning how to use reactor types to do batching.
User can use the following code snippet to receive events in batches of 100 events with a max wait time of 10 seconds. If there are 100 events, a List<PartitionEvent>
is emitted immediately or if there aren't 100 messages for 10 seconds, this will return a list containing any number of events (< 100) received until that point.
Flux<List<PartitionEvent>> fluxOfEventBatches = consumerAsyncClient.receive()
.bufferTimeout(100, Duration.ofSeconds(10));
Scenarios and semantics are documented here
In EventProcessorClient, users provide a callback that can handle a single event. So, upon receiving an event, the user has to process event before the next event is received. Some customers of this API had provided feedback to support receiving events in batches which helps them process the batch of events and perform various post-processing tasks which are expensive just once per batch. Doing this operation after each event is not desirable and the current form of our receive API forces users to do the batching on their own.
In order to simplify this for the user, the proposal is to add an additional callback that allows users to receive events in batches. The user provides a callback that can handle a batch of events and also configure the maximum number of events per batch (maxBatchSize) and the max duration to wait (maxWaitTime). The callback is invoked whenever the service returns maxBatchSize events or when the maxWaitTime duration has elapsed. When maxWaitTime has elapsed, the callback is invoked with empty batch or a batch containing less than the maxBatchSize events. The empty batch serves as heartbeat when there are no events available in Event Hubs.
Consumer<List<EventContext>> eventBatchConsumer = eventContextBatch -> {
eventContextBatch.forEach(eventContext -> processSingleEvent(eventContext));
// last event in the batch will be used to update checkpoint
updateCheckpoint();
};
int maxBatchSize = 10;
// This client will process events in batches of 10 events. If there are no events, this will wait indefinitely
// and the callback will be invoked as soon as events arrive without any delay.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.connectionString("")
.consumerGroup("")
.processEventBatch(eventBatchConsumer, maxBatchSize)
.processError(errorHandler)
.checkpointStore(checkpointStore)
.buildEventProcessorClient();
// Overload with max wait time
Duration maxWaitTime = Duration.ofSeconds(10);
// This client will process events in batches of 10 events, if available or waits up to 10 seconds
// before invoking the callback with however many events were received until that point.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.connectionString("")
.consumerGroup("")
.processEventBatch(eventBatchConsumer, maxBatchSize, maxWaitTime)
.processError(errorHandler)
.checkpointStore(checkpointStore)
.buildEventProcessorClient();
Similar to the batch, for single event process handler, we can have a maxWaitTime after which, if no events are received, the callback will be invoked with a null event to serve as heartbeat.
Consumer<EventContext> processEventConsumer = eventContext -> {
if(eventContext.getEventData() != null) {
System.out.printf("Received event: partition = %d, event data = %s%n",
eventContext.getPartitionContext.getPartitionId(),
eventContext.getEventData().getBodyAsString());
updateCheckpoint();
}
};
Duration maxWaitTime = Duration.ofSeconds(10);
// This client will process one event at a time, but if there are no incoming events for 10 seconds the callback
// will be invoked with a null event. Note that the partition context will still be available for every invocation.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
.connectionString("")
.consumerGroup("")
.processEvent(processEventConsumer, maxWaitTime)
.processError(errorHandler)
.checkpointStore(checkpointStore)
.buildEventProcessorClient();