Skip to content

Instantly share code, notes, and snippets.

@srnagar
Created October 16, 2019 21:36
Show Gist options
  • Save srnagar/fcbd428f15c010ee0e27fba16d82d777 to your computer and use it in GitHub Desktop.
Save srnagar/fcbd428f15c010ee0e27fba16d82d777 to your computer and use it in GitHub Desktop.
Event Hubs Java API review
    // ---------------------Send Operations----------------------------------
    /* Send events without specifying partition */
    private static void send(List<EventData> eventDataList) {
        EventHubProducerClient producerClient = new EventHubClientBuilder()
            .connectionString("")
            .buildProducerClient();
        producerClient.send(eventDataList);
    }

    /* Send events using a partition key - not a specific partition id */
    private static void sendWithPartitionKey(List<EventData> eventDataList) {
        EventHubProducerClient producerClient = new EventHubClientBuilder()
            .connectionString("")
            .buildProducerClient();
        SendOptions sendOptions = new SendOptions().setPartitionKey("partitionKey");
        producerClient.send(eventDataList, sendOptions);
    }

    /* Send events to a specific partition id */
    private static void sendToPartition(List<EventData> eventDataList) {
        EventHubProducerClient producerClient = new EventHubClientBuilder()
            .connectionString("")
            .buildProducerClient();
        SendOptions sendOptions = new SendOptions().setPartitionId("1");
        producerClient.send(eventDataList, sendOptions);
    }

    /*
       Send events to same partition, but dont re-use AMQP link, but use the same connection
       This mimics Track 1's creation of multiple producers from the same client
       Need to check with service if this is a case we should care about.
       Regardless, this is an advanced case, can be an add on, need not be in GA
   */
    private static void sendToPartitionSeparateLinks(List<EventData> eventDataList) {
        EventHubConnection eventHubConnection = new EventHubConnection(connectionString);
        EventHubClientBuilder clientBuilder = new EventHubClientBuilder().connection(eventHubConnection);
        EventHubProducerClient producerClient1 = clientBuilder.buildProducerClient();
        EventHubProducerClient producerClient2 = clientBuilder.buildProducerClient();

        SendOptions sendOptions1 = new SendOptions().setPartitionId("1");
        SendOptions sendOptions2 = new SendOptions().setPartitionId("1");
        producerClient1.send(eventDataList, sendOptions1);
        producerClient2.send(eventDataList, sendOptions2);
    }


    // ---------------------Receive Operations----------------------------------
    /* Receive events from all partitions with no load balancing or checkpointing*/
    private static void receiveAll() {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .processEvents((ctx, eventData) -> {
                // process event
                ctx.getPartitionId();
                return Mono.empty();
            })
            .buildConsumerClient();

        consumerClient.start();
        consumerClient.stop();
    }

    /* Receive events from all partitions with load balancing and checkpointing*/
    private static void recieveAll(PartitionManager partitionManager) {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionManager(partitionManager)
            .processEvents((ctx, eventData) -> {
                // process event
                ctx.getPartitionId();
                return ctx.updateCheckpoint(eventData);
            })
            .buildConsumerClient();

        consumerClient.start();
        consumerClient.stop();
    }

    /* Receive events from specific partitions */
    private static void receiveFromPartitions() {
        EventHubConnection eventHubConnection = new EventHubConnection(connectionString);
        EventHubPartitionConsumer eventHubPartitionConsumer1 = new EventHubPartitionConsumer(eventHubConnection,
            consumerGroup, partitionId1, eventPosition);
        EventHubPartitionConsumer eventHubPartitionConsumer2 = new EventHubPartitionConsumer(eventHubConnection,
            consumerGroup, partitionId2, eventPosition);

        eventHubPartitionConsumer1.receive().subscribe(eventData -> processEvents1(eventData));
        eventHubPartitionConsumer2.receive().subscribe(eventData -> processEvents2(eventData));
    }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment