// ---------------------Receive Operations----------------------------------
/* Receive events from all partitions with no load balancing or checkpointing*/
private static void receiveAll() {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.buildConsumerClient();
// Have to combine event data and partition context to return a Flux
// of a single type - not every clean
consumerClient.receive().subscribe(eventDataWithContext -> {
// process event
workOnTheEvent(eventDataWithContext.getEventData());
// get the partition info
eventDataWithContext.getContext().getPartitionId();
return Mono.empty();
}));
consumerClient.close();
}
/* Receive events from all partitions with load balancing and checkpointing*/
private static void recieveAll(PartitionManager partitionManager) {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.partitionManager(partitionManager)
.buildConsumerClient();
// Have to combine event data and partition context to return a Flux
// of a single type - not every clean
consumerClient.receive().subscribe(eventDataWithContext -> {
// process event
workOnTheEvent(eventDataWithContext.getEventData());
// get the partition info
eventDataWithContext.getContext().getPartitionId();
return eventDataWithContext.updateCheckpoint();
}));
consumerClient.close();
}
/* Receive events from specific partitions */
private static void receiveFromPartitions() {
EventHubConsumerClient consumerClient1 = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.partitionId("1")
.buildConsumerClient();
// Have to combine event data and partition context to return a Flux
// of a single type - not every clean
consumerClient1.receive().subscribe(eventDataWithContext -> {
// process event
workOnTheEvent(eventDataWithContext.getEventData());
// get the partition info
eventDataWithContext.getContext().getPartitionId();
return Mono.empty();
}));
consumerClient1.close();
EventHubConsumerClient consumerClient2 = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.partitionId("2")
.buildConsumerClient();
// Have to combine event data and partition context to return a Flux
// of a single type - not every clean
consumerClient2.receive().subscribe(eventDataWithContext -> {
// process event
workOnTheEvent(eventDataWithContext.getEventData());
// get the partition info
eventDataWithContext.getContext().getPartitionId();
return Mono.empty();
}));
consumerClient2.close();
}
/* Receive events from all partitions with initialize and close */
private static void receiveAllWithInitialization() {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.onInitialize(partitionContext -> {
// initialize a partition
})
.onClose((partitionContext, closeReason) -> {
// close a partition
})
.buildConsumerClient();
// Have to combine event data and partition context to return a Flux
// of a single type - not every clean
consumerClient.receive().subscribe(eventDataWithContext -> {
// process event
workOnTheEvent(eventDataWithContext.getEventData());
// get the partition info
eventDataWithContext.getContext().getPartitionId();
return Mono.empty();
}));
consumerClient.close();
}
/* Receive events from all partitions with state management, checkpointing and load balancing */
private static void receiveAllWithInitialization() {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.partitionManager(partitionManager)
.buildConsumerClient();
Map<String, State> partitionState = new ConcurrentHashMap<>();
// Have to combine event data and partition context to return a Flux
// of a single type - not every clean
consumerClient.receive().subscribe(eventDataWithContext -> {
// process event
EventData latestEvent = eventDataWithContext.getEventData();
workOnTheEvent(latestEvent);
// get the state for this partition
State myState = partitionState.get(eventDataWithContext.getContext().getPartitionId());
myState.update(latestEvent)
return eventDataWithContext.updateCheckpoint();
}));
consumerClient.close();
}
/* Receive events from specific partitions with partition manager will crash */
private static void receiveWithPartitionManagerThrows() {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.partitionManager(partitionManager)
.partitionId("0") // can't have partition manager and partition id
.buildConsumerClient();
// throws UnsupportedOperationException
}
/* Receive events from specific partitions with initialize/close handlers will crash */
private static void receiveWithInitializationThrows() {
EventHubConsumerClient consumerClient = new EventHubClientBuilder()
.connectionString("")
.consumerGroup("")
.onInitialize(partitionContext -> {
// initialize a partition
})
.onClose((partitionContext, closeReason) -> {
// close a partition
})
.partitionId("0") // can't have intialize/close handlers and partition id
.buildConsumerClient();
// throws UnsupportedOperationException
}
Last active
October 10, 2019 00:16
-
-
Save srnagar/d1e2537ce35d8c73823ddc22c209c690 to your computer and use it in GitHub Desktop.
EPH pull model
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment