You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// with connection stringEventHubProducerClientproducerClient = newEventHubClientBuilder()
.connectionString("connection-string")
.buildProducer();
// with connection string and event hub name EventHubProducerClientproducerClient = newEventHubClientBuilder()
.connectionString("connection-string", "event-hub-name")
.buildProducer();
// with credential EventHubProducerClientproducerClient = newEventHubClientBuilder()
.credential("fully-qualified-namespace", "event-hub-name", credential)
.buildProducer();
// with connectionEventHubConnectioneventHubConnection = newEventHubConnection("connection-string");
EventHubProducerClientproducerClient = newEventHubClientBuilder()
.connection(eventHubConnection)
.buildProducer();
Sending events
Flux<EventData> events = getEvents();
// send with no partition affinityproducerClient.send(events);
// send with a partition keySendOptionssendOptions = newSendOptions().partitionKey("my-partition-key");
producerClient.send(events, sendOptions);
// send to a specific partition idSendOptionssendOptions = newSendOptions().partitionId("0");
producerClient.send(events, sendOptions);
Consume events
Creating a consumer client
// with connection stringEventHubConsumerClientconsumerClient = newEventHubClientBuilder()
.connectionString("connection-string")
.buildConsumer();
// with connection string and event hub nameEventHubConsumerClientconsumerClient = newEventHubClientBuilder()
.connectionString("connection-string", "event-hub-name")
.buildConsumer();
// with credentialEventHubConsumerClientconsumerClient = newEventHubClientBuilder()
.credential("fully-qualified-namespace", "event-hub-name", credential)
.buildConsumer();
// with connectionEventHubConnectioneventHubConnection = newEventHubConnection("connection-string");
EventHubConsumerClientconsumerClient = newEventHubClientBuilder()
.connection(eventHubConnection)
.buildConsumer();
Consume events
EventHubConsumerClientconsumerClient = buildWithOneOfTheMethodsAbove();
// consume events from all partitionsconsumerClient.receive().subscribe(partitionEvent -> {
StringpartitionId = partitionEvent.getPartitionId();
EventDataeventData = partitionEvent.getEventData();
});
// consume events from a specific partitionconsumerClient.receive(partitionId).subscribe(eventData -> {
eventData.getBody();
});
Consume events with load balancing [Names to be discussed]
// Basic modeEventHubDistributedConsumerClientdistributedConsumerClient = newEventHubClientBuilder()
.connectionString("connection-string")
.checkpointStore(checkpointStore)
.consumerGroup("consumer-group")
.buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */});
// With partition initialization and closeEventHubDistributedConsumerClientdistributedConsumerClient = newEventHubClientBuilder()
.connectionString("connection-string")
.checkpointStore(checkpointStore)
.consumerGroup("consumer-group")
.initializePartition(initializationContext -> {/* user code */})
.closePartition(closeContext -> { /* user code */})
.buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */});
// With processing error handlerEventHubDistributedConsumerClientdistributedConsumerClient = newEventHubClientBuilder()
.connectionString("connection-string")
.checkpointStore(checkpointStore)
.consumerGroup("consumer-group")
.buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */}, partitionError -> {/* user code */});
// With consumer error handlerEventHubDistributedConsumerClientdistributedConsumerClient = newEventHubClientBuilder()
.connectionString("connection-string")
.checkpointStore(checkpointStore)
.consumerGroup("consumer-group")
.consumerError(error -> {/* user code */})
.buildDistributedConsumer();
distributedConsumerClient.startProcessing((partitionContext, eventData) -> {/* user code */}, partitionError -> {/* user code */});
distributedConsumerClient.stopProcessing();