Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active November 18, 2019 21:18
Show Gist options
  • Save srnagar/3132d53002dc25a316208cd700718012 to your computer and use it in GitHub Desktop.
Save srnagar/3132d53002dc25a316208cd700718012 to your computer and use it in GitHub Desktop.

Querying Event Hub properties

public void queryEventHubProperties() {
    EventHubProducerClient client = new EventHubClientBuilder()
        .connectionString("connection-string")
        .buildProducerClient();

    EventHubProperties eventHubProperties = client.getEventHubProperties();
    System.out.println("Name = " + eventHubProperties.getName());
    System.out.println("Number of partitions = " + eventHubProperties.getPartitionIds().stream().count());
    System.out.println("Event Hub creation time = " + eventHubProperties.getCreatedAt());
}

Send events to Event Hub

public void sendEvents(Iterator<EventData> events) {
    EventHubProducerClient producerClient = new EventHubClientBuilder()
        .connectionString("connection-string")
        .buildProducerClient();

    EventDataBatch eventDataBatch = producerClient.createBatch();
    while (events.hasNext()) {
        EventData event = events.next();
        if (!eventDataBatch.tryAdd(event)) {
            producerClient.send(eventDataBatch);
            eventDataBatch = producerClient.createBatch();
        }
    }

    if (eventDataBatch.getCount() > 0) {
        producerClient.send(eventDataBatch);
    }
    producerClient.close();
}

Send events to a specific partition

public void sendEventsToPartition(Iterator<EventData> events) {
    EventHubProducerClient producerClient = new EventHubClientBuilder()
        .connectionString("connection-string")
        .buildProducerClient();

    CreateBatchOptions batchOptions = new CreateBatchOptions()
        .setPartitionId("0");
    EventDataBatch eventDataBatch = producerClient.createBatch(batchOptions);
    while (events.hasNext()) {
        EventData event = events.next();
        if (!eventDataBatch.tryAdd(event)) {
            producerClient.send(eventDataBatch);
            eventDataBatch = producerClient.createBatch(batchOptions);
        }
    }

    if (eventDataBatch.getCount() > 0) {
        producerClient.send(eventDataBatch);
    }
    producerClient.close();
}

Send events with a partition key

public void sendEventsWithPartitionKey(Iterator<EventData> events) {
    EventHubProducerClient producerClient = new EventHubClientBuilder()
        .connectionString("connection-string")
        .buildProducerClient();

    CreateBatchOptions batchOptions = new CreateBatchOptions()
        .setPartitionKey("username");
    EventDataBatch eventDataBatch = producerClient.createBatch(batchOptions);
    while (events.hasNext()) {
        EventData event = events.next();
        if (!eventDataBatch.tryAdd(event)) {
            producerClient.send(eventDataBatch);
            eventDataBatch = producerClient.createBatch(batchOptions);
        }
    }

    if (eventDataBatch.getCount() > 0) {
        producerClient.send(eventDataBatch);
    }
    producerClient.close();
}

Receive events from all partitions

public void receiveEvents() {
    EventHubConsumerAsyncClient consumerClient = new EventHubClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .buildAsyncConsumerClient();

    consumerClient.receive(EventPosition.earliest())
        .subscribe(event -> {
            System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
            System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
        });
    consumerClient.close();
}

Receive events from a specific partition

public void receiveEventsFromPartition() {
    EventHubConsumerAsyncClient consumerClient = new EventHubClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .buildAsyncConsumerClient();

    String partitionId = "0";
    consumerClient.receive(partitionId, EventPosition.earliest())
        .subscribe(event -> {
            System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
            System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
        });
    consumerClient.close();
}

Process events using an Event Processor

public void eventProcessor(CheckpointStore checkpointStore) {
    Consumer<ProcessorEvent> eventHandler = event -> {
        System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
        System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
    };

    Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
        System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
        System.out.println("Error message = " + errorContext.getThrowable().getMessage());
    };

    EventProcessorClient processorClient = new EventProcessorClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .checkpointStore(checkpointStore)
        .processEvent(eventHandler)
        .processError(errorHandler)
        .buildEventProcessorClient();

    processorClient.start();
    sleep(10, TimeUnit.SECONDS);
    processorClient.stop();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment