Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active March 24, 2020 01:43
Show Gist options
  • Save srnagar/74347a238ce197b3a61d18d90c082fba to your computer and use it in GitHub Desktop.
Save srnagar/74347a238ce197b3a61d18d90c082fba to your computer and use it in GitHub Desktop.
API for receiving events in batches

Receive batch of events

Initial release version (5.0.1) of Event Hubs client library supports receiving events primarily through 3 consumer clients

  • EventHubConsumerClient
  • EventHubAsyncConsumerClient
  • EventProcessorClient

EventHubConsumerClient

This sync client already supports receiving events in batches. So, no changes are required for the sync client.

public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, 
EventPosition startingPosition, Duration maxWaitTime);

public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount, 
EventPosition startingPosition, Duration maxWaitTime, ReceiveOptions receiveOptions);

EventHubAsyncConsumerClient

The async client can use Reactor APIs to receive events in batches by using bufferTimeout(). This requires the user to learn how to do this using Reactor. So, we may provide a convenience API instead on the async consumer as shown below.

public Flux<PartitionEvent> receive();
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent);
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions);

// Demonstrates a convenient batch API on the client
public Flux<List<PartitionEvent>> receiveBatch(int maxBatchSize, Duration maxWaitTime) {
      this.receive().bufferTimeout(maxBatchSize, maxWaitTime);
}

User code without the convenience API

With the Flux return type in existing receive methods batching can be accomplished by the user as shown below. So, adding a receiveBatch() method on the client is a trade-off between increasing the number to of APIs and expecting the user to look for samples and/or learning how to use reactor types to do batching.

User can use the following code snippet to receive events in batches of 100 events with a max wait time of 10 seconds. If there are 100 events, a List<PartitionEvent> is emitted immediately or if there aren't 100 messages for 10 seconds, this will return a list containing any number of events (< 100) received until that point.

Flux<List<PartitionEvent>> fluxOfEventBatches = consumerAsyncClient.receive()
    .bufferTimeout(100, Duration.ofSeconds(10));

EventProcessorClient

Scenarios and semantics are documented here

In EventProcessorClient, users provide a callback that can handle a single event. So, upon receiving an event, the user has to process event before the next event is received. Some customers of this API had provided feedback to support receiving events in batches which helps them process the batch of events and perform various post-processing tasks which are expensive just once per batch. Doing this operation after each event is not desirable and the current form of our receive API forces users to do the batching on their own.

In order to simplify this for the user, the proposal is to add an additional callback that allows users to receive events in batches. The user provides a callback that can handle a batch of events and also configure the maximum number of events per batch (maxBatchSize) and the max duration to wait (maxWaitTime). The callback is invoked whenever the service returns maxBatchSize events or when the maxWaitTime duration has elapsed. When maxWaitTime has elapsed, the callback is invoked with empty batch or a batch containing less than the maxBatchSize events. The empty batch serves as heartbeat when there are no events available in Event Hubs.

Consumer<List<EventContext>> eventBatchConsumer = eventContextBatch -> {
      eventContextBatch.forEach(eventContext -> processSingleEvent(eventContext));
      // last event in the batch will be used to update checkpoint
      updateCheckpoint(); 
  };

int maxBatchSize = 10;

// This client will process events in batches of 10 events. If there are no events, this will wait indefinitely 
// and the callback will be invoked as soon as events arrive without any delay. 
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
                .connectionString("")
                .consumerGroup("")
                .processEventBatch(eventBatchConsumer, maxBatchSize)
                .processError(errorHandler)
                .checkpointStore(checkpointStore)
                .buildEventProcessorClient();    
                
// Overload with max wait time                
Duration maxWaitTime = Duration.ofSeconds(10);

// This client will process events in batches of 10 events, if available or waits up to 10 seconds 
// before invoking the callback with however many events were received until that point. 
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
                .connectionString("")
                .consumerGroup("")
                .processEventBatch(eventBatchConsumer, maxBatchSize, maxWaitTime)
                .processError(errorHandler)
                .checkpointStore(checkpointStore)
                .buildEventProcessorClient();                            

Heartbeat for single event handler

Similar to the batch, for single event process handler, we can have a maxWaitTime after which, if no events are received, the callback will be invoked with a null event to serve as heartbeat.

Consumer<EventContext> processEventConsumer = eventContext -> {
      if(eventContext.getEventData() != null) {
          System.out.printf("Received event: partition = %d, event data = %s%n", 
                  eventContext.getPartitionContext.getPartitionId(),
                  eventContext.getEventData().getBodyAsString());
          updateCheckpoint();
      }
  };

Duration maxWaitTime = Duration.ofSeconds(10);

// This client will process one event at a time, but if there are no incoming events for 10 seconds the callback 
// will be invoked with a null event. Note that the partition context will still be available for every invocation.
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
                .connectionString("")
                .consumerGroup("")
                .processEvent(processEventConsumer, maxWaitTime)
                .processError(errorHandler)
                .checkpointStore(checkpointStore)
                .buildEventProcessorClient();       
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment