Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active October 11, 2019 21:50
Show Gist options
  • Save srnagar/68f0d06b862eaa5ca94f6d3a1f7f2622 to your computer and use it in GitHub Desktop.
Save srnagar/68f0d06b862eaa5ca94f6d3a1f7f2622 to your computer and use it in GitHub Desktop.
Event Hubs API

Publish events

Creating a producer client

// with connection string
EventHubProducerClient producerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string")
                                        .buildProducer();
// with connection string and event hub name                                        
EventHubProducerClient producerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string", "event-hub-name")
                                        .buildProducer();    
// with credential                                        
EventHubProducerClient producerClient = new EventHubClientBuilder()
                                        .credential("fully-qualified-namespace", "event-hub-name", credential)
                                        .buildProducer(); 

// with connection
EventHubConnection eventHubConnection = new EventHubConnection("connection-string");
EventHubProducerClient producerClient = new EventHubClientBuilder()
                                        .connection(eventHubConnection)
                                        .buildProducer();

Sending events

Flux<EventData> events = getEvents();

// send with no partition affinity
producerClient.send(events);

// send with a partition key
SendOptions sendOptions = new SendOptions().partitionKey("my-partition-key");
producerClient.send(events, sendOptions);

// send to a specific partition id
SendOptions sendOptions = new SendOptions().partitionId("0");
producerClient.send(events, sendOptions);

Consume events

Creating a consumer client

// with connection string
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string")
                                        .buildConsumer();
// with connection string and event hub name
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string", "event-hub-name")
                                        .buildConsumer();

// with credential
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
                                        .credential("fully-qualified-namespace", "event-hub-name", credential)
                                        .buildConsumer();
// with connection
EventHubConnection eventHubConnection = new EventHubConnection("connection-string");
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
                                        .connection(eventHubConnection)
                                        .buildConsumer();                                        

Consume events

EventHubConsumerClient consumerClient = buildWithOneOfTheMethodsAbove();

// consume events from all partitions
consumerClient.receive().subscribe(partitionEvent -> {
        String partitionId = partitionEvent.getPartitionId();
        EventData eventData = partitionEvent.getEventData();
    });

// consume events from a specific partition
consumerClient.receive(partitionId).subscribe(eventData -> {
        eventData.getBody();
    });    

Consume events with load balancing [Names to be discussed]

// Basic mode
EventHubDistributedConsumerClient distributedConsumerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string")
                                        .checkpointStore(checkpointStore)
                                        .consumerGroup("consumer-group")
                                        .buildDistributedConsumer(); 
                                        
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */});

// With partition initialization and close
EventHubDistributedConsumerClient distributedConsumerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string")
                                        .checkpointStore(checkpointStore)
                                        .consumerGroup("consumer-group")
                                        .initializePartition(initializationContext -> {/* user code */})
                                        .closePartition(closeContext -> { /* user code */})
                                        .buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */});

// With processing error handler
EventHubDistributedConsumerClient distributedConsumerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string")
                                        .checkpointStore(checkpointStore)
                                        .consumerGroup("consumer-group")
                                        .buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */}, partitionError -> {/* user code */});

// With consumer error handler
EventHubDistributedConsumerClient distributedConsumerClient = new EventHubClientBuilder()
                                        .connectionString("connection-string")
                                        .checkpointStore(checkpointStore)
                                        .consumerGroup("consumer-group")
                                        .consumerError(error -> {/* user code */})
                                        .buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */}, partitionError -> {/* user code */});

distributedConsumerClient.stopProcessing();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment