(all names in this doc are not finalized yet)
EPH APIs
/**
* This is the starting point for event processor. To create an instance of this, use the
* {@link EventProcessorBuilder}
*/
public class EventProcessor {
/**
* Package-private constructor. Use the {@link EventProcessorBuilder} to create an instance
*/
EventProcessor(EventHubAsyncClient eventHubAsyncClient, String consumerGroupName,
EventPosition eventPosition, PartitionManager partitionManager,
BiFunction<PartitionContext, CheckpointManager, PartitionProcessor> partitionProcessorFactory,
EventPosition initialEventPosition) {
}
/**
* Starts the event processor
*/
public Mono<Void> start(){
return Mono.empty();
}
/**
* Stops the event processor
*/
public Mono<Void> stop() {
return Mono.empty();
}
}
EPH Builder
public class EventProcessorBuilder {
public EventProcessorBuilder connectionString(String connectionString) {
return this;
}
public EventProcessorBuilder connectionString(String connectionString, String eventHubPath) {
return this;
}
public EventProcessorBuilder configuration(Configuration configuration) {
return this;
}
public EventProcessorBuilder credential(String host, String eventHubPath, TokenCredential credential) {
return this;
}
public EventProcessorBuilder proxyConfiguration(ProxyConfiguration proxyConfiguration) {
return this;
}
public EventProcessorBuilder scheduler(Scheduler scheduler) {
return this;
}
public EventProcessorBuilder transportType(TransportType transport) {
return this;
}
public EventProcessorBuilder timeout(Duration timeout) {
return this;
}
public EventProcessorBuilder retry(Retry retry) {
return this;
}
public EventProcessorBuilder initialEventPosition(Long intialEventPosition) {
return this;
}
public EventProcessorBuilder partitionProcessorFactory(
BiFunction<PartitionContext, CheckpointManager, PartitionProcessor> partitionProcessorFactory) {
return this;
}
/**
* This will build the EventHubAsyncClient and then use it to build EventProcessor
*/
public EventProcessor buildEventProcessor() {
return null;
}
}
Partition Context
/**
* PartitionContext is passed into an EventProrcessor's initialization handler and contains information
* about the partition the EventProcessor will be processing events from.
*/
class PartionContext {
String partitionId;
String consumerGroupName;
String eventHubName;
}
Partition Processor
interface PartitionProcessor {
/**
* Called before Event Processor starts processing a new partition
*/
Mono<Void> initialize();
/**
* Called with Event Processor is stopped or if the
* ownership of this partition is lost
*/
Mono<Void> close();
/**
* The events will be processed in this method when they arrive.
*/
Mono<Void> processEvents(Flux<EventData> eventData);
/**
* Called when there is an error in the underlying receiver
*/
Mono<Void> processError(Throwable throwable);
}
Checkpoint manager
public class CheckpointManager {
private PartitionContext partitionContext;
// The update checkpoint methods in this class will forward the request to
// underlying partition manager
private PartitionManager partitionManager;
/**
* Updates a checkpoint using the event data
*/
public Mono<Void> updateCheckpoint(EventData eventData){
return Mono.empty();
}
/**
* Updates a checkpoint using the given offset and sequence number
*/
public Mono<Void> updateCheckpoint(long sequenceNumber, long offset){
return Mono.empty();
}
}
Partition manager
// Passed into the EventProcessorHost to manage partition ownership and checkpoint creation.
interface PartitionManager {
/**
* Called to get the list of all existing partition ownership from the underlying data store.
* Could return empty results if there are is no existing ownership information.
*/
Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName);
/**
* Called to claim ownership of a list of partitions. This will return the list of
* partitions that were owned successfully.
* @param requestedPartitionOwnerships
* @return
*/
Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships);
/**
* Updates the checkpoint in the data store for a partition
*/
Mono<Void> updateCheckpoint(Checkpoint checkpoint);
}
Partition ownership
// used by PartitionManager to claim ownership.
// returned by listOwnership
class PartitionOwnership {
String eventHubName;
String consumerGroupName;
String instanceId;
String partitionId;
long ownerLevel;
Long sequenceNumber; // optional
Long offset; // optional
Long lastModifiedTime; // optional
String eTag; // optional
}
Checkpoint
// used by updateCheckpoint in PartitionManager
public class Checkpoint {
private String eventHubName;
private String consumerGroupName;
private String instanceId;
private String partitionId;
private long sequenceNumber;
private long offsetNumber;
}
Sample customer facing implementation
/**
* Sample code to demonstrate how a customer might use {@link EventProcessor}
*/
public class EventProcessorSample {
private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessorSample.class);
public static void main(String[] args) {
EventProcessor eventProcessor = new EventProcessorBuilder()
.connectionString("")
.partitionProcessorFactory(ConsolePartitionProcessor::new)
.buildEventProcessor();
LOGGER.info("Starting event processor");
eventProcessor
.start()
.subscribe();
LOGGER.info("Event processor started");
// do stuff
LOGGER.info("Stopping event processor");
eventProcessor.stop().block();
LOGGER.info("Stopped event processor");
}
}
/**
* A simple implementation of a {@link PartitionProcessor} that logs the
* methods called in this implementation to a console
*/
public class ConsolePartitionProcessor implements PartitionProcessor {
private Logger logger = LoggerFactory.getLogger(ConsolePartitionProcessor.class);
private PartitionContext partitionContext;
private CheckpointManager checkpointManager;
public ConsolePartitionProcessor(PartitionContext partitionContext,
CheckpointManager checkpointManager) {
this.partitionContext = partitionContext;
this.checkpointManager = checkpointManager;
}
@Override
public Mono<Void> initialize() {
logger.info("Initializing partition processor: event hub name = " + partitionContext
.getEventHubName() + "; consumer group name = " + partitionContext
.getConsumerGroupName() + "; partition id = " + partitionContext.getPartitionId());
return Mono.empty();
}
@Override
public Mono<Void> close() {
logger.info(
"Closing partition processor event hub name = " + partitionContext.getEventHubName()
+ "; consumer group name = " + partitionContext.getConsumerGroupName()
+ "; partition id = " + partitionContext.getPartitionId());
return Mono.empty();
}
@Override
public Mono<Void> processEvents(Flux<EventData> eventData) {
logger.info("Processing events ");
eventData.subscribe(this::process);
return Mono.empty();
}
@Override
public Mono<Void> processError(Throwable throwable) {
logger.warn("Error while processing partition");
return Mono.empty();
}
private void process(EventData event) {
logger.info("Processing event with sequence number " + event.sequenceNumber());
}
}
/**
* A simple in-memory implementation of a {@link PartitionManager}
*/
public class InMemoryPartitionManager implements PartitionManager {
private Map<String, PartitionOwnership> partitionOwnershipMap = new ConcurrentHashMap<>();
@Override
public Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName) {
return Flux.fromStream(partitionOwnershipMap.values().stream());
}
@Override
public Flux<PartitionOwnership> claimOwnership(
List<PartitionOwnership> requestedPartitionOwnerships) {
return Flux.fromStream(requestedPartitionOwnerships.stream()
.filter(partitionOwnership -> !partitionOwnershipMap
.containsKey(partitionOwnership.getPartitionId()))
.map(partitionOwnership -> {
partitionOwnershipMap.put(partitionOwnership.getPartitionId(), partitionOwnership);
return partitionOwnership;
}));
}
@Override
public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
partitionOwnershipMap.get(checkpoint.getPartitionId())
.setSequenceNumber(checkpoint.getSequenceNumber());
partitionOwnershipMap.get(checkpoint.getPartitionId())
.setOffset(checkpoint.getOffsetNumber());
return Mono.empty();
}
}