Skip to content

Instantly share code, notes, and snippets.

Last active July 24, 2019 23:23
Show Gist options
  • Save srnagar/9e5642d8bdafa0daa257174d5eac6baa to your computer and use it in GitHub Desktop.
Save srnagar/9e5642d8bdafa0daa257174d5eac6baa to your computer and use it in GitHub Desktop.
Java API for EPH

(all names in this doc are not finalized yet)


 * This is the starting point for event processor. To create an instance of this, use the
 * {@link EventProcessorBuilder}
public class EventProcessor {
     * Package-private constructor. Use the {@link EventProcessorBuilder} to create an instance 
    EventProcessor(EventHubAsyncClient eventHubAsyncClient, String consumerGroupName,
        EventPosition eventPosition, PartitionManager partitionManager,
        BiFunction<PartitionContext, CheckpointManager, PartitionProcessor> partitionProcessorFactory,
        EventPosition initialEventPosition) {
     * Starts the event processor
    public Mono<Void> start(){
        return Mono.empty();

     * Stops the event processor
    public Mono<Void> stop() {
        return Mono.empty();

EPH Builder

public class EventProcessorBuilder {

    public EventProcessorBuilder connectionString(String connectionString) {
        return this;

    public EventProcessorBuilder connectionString(String connectionString, String eventHubPath) {
      return this;

    public EventProcessorBuilder configuration(Configuration configuration) {
        return this;

    public EventProcessorBuilder credential(String host, String eventHubPath, TokenCredential credential) {
        return this;

    public EventProcessorBuilder proxyConfiguration(ProxyConfiguration proxyConfiguration) {
        return this;

    public EventProcessorBuilder scheduler(Scheduler scheduler) {
        return this;

    public EventProcessorBuilder transportType(TransportType transport) {
        return this;

    public EventProcessorBuilder timeout(Duration timeout) {
        return this;

    public EventProcessorBuilder retry(Retry retry) {
        return this;

    public EventProcessorBuilder initialEventPosition(Long intialEventPosition) {
        return this;

    public EventProcessorBuilder partitionProcessorFactory(
        BiFunction<PartitionContext, CheckpointManager, PartitionProcessor> partitionProcessorFactory) {
        return this;

     * This will build the EventHubAsyncClient and then use it to build EventProcessor
    public EventProcessor buildEventProcessor() {
        return null;

Partition Context

 * PartitionContext is passed into an EventProrcessor's initialization handler and contains information
 * about the partition the EventProcessor will be processing events from.
class PartionContext {
    String partitionId;
    String consumerGroupName;
    String eventHubName;

Partition Processor

interface PartitionProcessor {

     * Called before Event Processor starts processing a new partition
    Mono<Void> initialize();

     * Called with Event Processor is stopped or if the 
     * ownership of this partition is lost
    Mono<Void> close();

     * The events will be processed in this method when they arrive.
    Mono<Void> processEvents(Flux<EventData> eventData);

     * Called when there is an error in the underlying receiver
    Mono<Void> processError(Throwable throwable);

Checkpoint manager

public class CheckpointManager {
    private PartitionContext partitionContext;
    // The update checkpoint methods in this class will forward the request to 
    // underlying partition manager
    private PartitionManager partitionManager;

     * Updates a checkpoint using the event data
    public Mono<Void> updateCheckpoint(EventData eventData){
        return Mono.empty();

     * Updates a checkpoint using the given offset and sequence number
    public Mono<Void> updateCheckpoint(long sequenceNumber, long offset){
        return Mono.empty();

Partition manager

// Passed into the EventProcessorHost to manage partition ownership and checkpoint creation.
interface PartitionManager {

     * Called to get the list of all existing partition ownership from the underlying data store.
     * Could return empty results if there are is no existing ownership information.
    Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName);

     * Called to claim ownership of a list of partitions. This will return the list of 
     * partitions that were owned successfully. 
     * @param requestedPartitionOwnerships
     * @return
    Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships);

     * Updates the checkpoint in the data store for a partition 
    Mono<Void> updateCheckpoint(Checkpoint checkpoint);

Partition ownership

// used by PartitionManager to claim ownership.
// returned by listOwnership
class PartitionOwnership {
    String eventHubName;
    String consumerGroupName;
    String instanceId;
    String partitionId;
    long ownerLevel;
    Long sequenceNumber; // optional
    Long offset; // optional
    Long lastModifiedTime; // optional
    String eTag; // optional


// used by updateCheckpoint in PartitionManager 
public class Checkpoint {
    private String eventHubName;
    private String consumerGroupName;
    private String instanceId;
    private String partitionId;
    private long sequenceNumber;
    private long offsetNumber;

Sample customer facing implementation

 * Sample code to demonstrate how a customer might use {@link EventProcessor}
public class EventProcessorSample {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessorSample.class);

    public static void main(String[] args) {
        EventProcessor eventProcessor = new EventProcessorBuilder()
            .buildEventProcessor();"Starting event processor");
            .subscribe();"Event processor started");
        // do stuff"Stopping event processor");
        eventProcessor.stop().block();"Stopped event processor");

 * A simple implementation of a {@link PartitionProcessor} that logs the 
 * methods called in this implementation to a console
public class ConsolePartitionProcessor implements PartitionProcessor {

    private Logger logger = LoggerFactory.getLogger(ConsolePartitionProcessor.class);

    private PartitionContext partitionContext;
    private CheckpointManager checkpointManager;

    public ConsolePartitionProcessor(PartitionContext partitionContext,
        CheckpointManager checkpointManager) {
        this.partitionContext = partitionContext;
        this.checkpointManager = checkpointManager;

    public Mono<Void> initialize() {"Initializing partition processor: event hub name = " + partitionContext
            .getEventHubName() + "; consumer group name = " + partitionContext
            .getConsumerGroupName() + "; partition id = " + partitionContext.getPartitionId());

        return Mono.empty();

    public Mono<Void> close() {
            "Closing partition processor event hub name = " + partitionContext.getEventHubName()
                + "; consumer group name = " + partitionContext.getConsumerGroupName()
                + "; partition id = " + partitionContext.getPartitionId());
        return Mono.empty();

    public Mono<Void> processEvents(Flux<EventData> eventData) {"Processing events ");
        return Mono.empty();

    public Mono<Void> processError(Throwable throwable) {
        logger.warn("Error while processing partition");
        return Mono.empty();

    private void process(EventData event) {"Processing event with sequence number " + event.sequenceNumber());

 * A simple in-memory implementation of a {@link PartitionManager}
public class InMemoryPartitionManager implements PartitionManager {

    private Map<String, PartitionOwnership> partitionOwnershipMap = new ConcurrentHashMap<>();

    public Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName) {
        return Flux.fromStream(partitionOwnershipMap.values().stream());

    public Flux<PartitionOwnership> claimOwnership(
        List<PartitionOwnership> requestedPartitionOwnerships) {
        return Flux.fromStream(
            .filter(partitionOwnership -> !partitionOwnershipMap
            .map(partitionOwnership -> {
                partitionOwnershipMap.put(partitionOwnership.getPartitionId(), partitionOwnership);
                return partitionOwnership;

    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        return Mono.empty();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment