Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active October 10, 2019 00:16
Show Gist options
  • Save srnagar/d1e2537ce35d8c73823ddc22c209c690 to your computer and use it in GitHub Desktop.
Save srnagar/d1e2537ce35d8c73823ddc22c209c690 to your computer and use it in GitHub Desktop.
EPH pull model
    // ---------------------Receive Operations----------------------------------
    /* Receive events from all partitions with no load balancing or checkpointing*/
    private static void receiveAll() {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .buildConsumerClient();

	// Have to combine event data and partition context to return a Flux
	// of a single type - not every clean
        consumerClient.receive().subscribe(eventDataWithContext -> {
                // process event
                workOnTheEvent(eventDataWithContext.getEventData());
                // get the partition info
                eventDataWithContext.getContext().getPartitionId();
                return Mono.empty();
            }));
        consumerClient.close();
    }

    /* Receive events from all partitions with load balancing and checkpointing*/
    private static void recieveAll(PartitionManager partitionManager) {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionManager(partitionManager)
            .buildConsumerClient();

	// Have to combine event data and partition context to return a Flux
	// of a single type - not every clean
        consumerClient.receive().subscribe(eventDataWithContext -> {
                // process event
                workOnTheEvent(eventDataWithContext.getEventData());
                // get the partition info
                eventDataWithContext.getContext().getPartitionId();
                return eventDataWithContext.updateCheckpoint();
            }));
        consumerClient.close();
    }

    /* Receive events from specific partitions */
    private static void receiveFromPartitions() {
        EventHubConsumerClient consumerClient1 = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionId("1")
            .buildConsumerClient();

	// Have to combine event data and partition context to return a Flux
	// of a single type - not every clean
        consumerClient1.receive().subscribe(eventDataWithContext -> {
                // process event
                workOnTheEvent(eventDataWithContext.getEventData());
                // get the partition info
                eventDataWithContext.getContext().getPartitionId();
                return Mono.empty();
            }));
        consumerClient1.close();
        
        EventHubConsumerClient consumerClient2 = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionId("2")
            .buildConsumerClient();

	// Have to combine event data and partition context to return a Flux
	// of a single type - not every clean
        consumerClient2.receive().subscribe(eventDataWithContext -> {
                // process event
                workOnTheEvent(eventDataWithContext.getEventData());
                // get the partition info
                eventDataWithContext.getContext().getPartitionId();
                return Mono.empty();
            }));
        consumerClient2.close();
    }
    
    /* Receive events from all partitions with initialize and close */
    private static void receiveAllWithInitialization() {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .onInitialize(partitionContext -> {
            	// initialize a partition
            })
            .onClose((partitionContext, closeReason) -> {
            	// close a partition
            })
            .buildConsumerClient();

	// Have to combine event data and partition context to return a Flux
	// of a single type - not every clean
        consumerClient.receive().subscribe(eventDataWithContext -> {
                // process event
                workOnTheEvent(eventDataWithContext.getEventData());
                // get the partition info
                eventDataWithContext.getContext().getPartitionId();
                return Mono.empty();
            }));
        consumerClient.close();
    }
    
    
    /* Receive events from all partitions with state management, checkpointing and load balancing */
    private static void receiveAllWithInitialization() {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionManager(partitionManager)
            .buildConsumerClient();
            
        Map<String, State> partitionState = new ConcurrentHashMap<>();

	// Have to combine event data and partition context to return a Flux
	// of a single type - not every clean
        consumerClient.receive().subscribe(eventDataWithContext -> {
                // process event
                EventData latestEvent = eventDataWithContext.getEventData();
                workOnTheEvent(latestEvent);
                // get the state for this partition
                State myState = partitionState.get(eventDataWithContext.getContext().getPartitionId());
                
                myState.update(latestEvent)
                return eventDataWithContext.updateCheckpoint();
            }));
            
        consumerClient.close();
    }
    
    /* Receive events from specific partitions with partition manager will crash */
    private static void receiveWithPartitionManagerThrows() {
    	EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionManager(partitionManager)
            .partitionId("0") // can't have partition manager and partition id
            .buildConsumerClient();
       // throws UnsupportedOperationException    
    }
    
    /* Receive events from specific partitions with initialize/close handlers will crash */
    private static void receiveWithInitializationThrows() {
    	EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .onInitialize(partitionContext -> {
            	// initialize a partition
            })
            .onClose((partitionContext, closeReason) -> {
            	// close a partition
            })
            .partitionId("0") // can't have intialize/close handlers and partition id
            .buildConsumerClient();
       // throws UnsupportedOperationException    
    }
    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment