Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active July 24, 2019 23:23
Show Gist options
  • Save srnagar/9e5642d8bdafa0daa257174d5eac6baa to your computer and use it in GitHub Desktop.
Save srnagar/9e5642d8bdafa0daa257174d5eac6baa to your computer and use it in GitHub Desktop.
Java API for EPH

(all names in this doc are not finalized yet)

EPH APIs

/**
 * This is the starting point for event processor. To create an instance of this, use the
 * {@link EventProcessorBuilder}
 */
public class EventProcessor {
    
    /**
     * Package-private constructor. Use the {@link EventProcessorBuilder} to create an instance 
     */
    EventProcessor(EventHubAsyncClient eventHubAsyncClient, String consumerGroupName,
        EventPosition eventPosition, PartitionManager partitionManager,
        BiFunction<PartitionContext, CheckpointManager, PartitionProcessor> partitionProcessorFactory,
        EventPosition initialEventPosition) {
    }
    
    /**
     * Starts the event processor
     */
    public Mono<Void> start(){
        return Mono.empty();
    }

    /**
     * Stops the event processor
     */
    public Mono<Void> stop() {
        return Mono.empty();
    }
}

EPH Builder

public class EventProcessorBuilder {

    public EventProcessorBuilder connectionString(String connectionString) {
        return this;
    }

    public EventProcessorBuilder connectionString(String connectionString, String eventHubPath) {
      return this;
    }

    public EventProcessorBuilder configuration(Configuration configuration) {
        return this;
    }

    public EventProcessorBuilder credential(String host, String eventHubPath, TokenCredential credential) {
        return this;
    }

    public EventProcessorBuilder proxyConfiguration(ProxyConfiguration proxyConfiguration) {
        return this;
    }

    public EventProcessorBuilder scheduler(Scheduler scheduler) {
        return this;
    }

    public EventProcessorBuilder transportType(TransportType transport) {
        return this;
    }

    public EventProcessorBuilder timeout(Duration timeout) {
        return this;
    }

    public EventProcessorBuilder retry(Retry retry) {
        return this;
    }

    public EventProcessorBuilder initialEventPosition(Long intialEventPosition) {
        return this;
    }

    public EventProcessorBuilder partitionProcessorFactory(
        BiFunction<PartitionContext, CheckpointManager, PartitionProcessor> partitionProcessorFactory) {
        return this;
    }

    /**
     * This will build the EventHubAsyncClient and then use it to build EventProcessor
     */
    public EventProcessor buildEventProcessor() {
        return null;
    }
}

Partition Context

/**
 * PartitionContext is passed into an EventProrcessor's initialization handler and contains information
 * about the partition the EventProcessor will be processing events from.
 */
class PartionContext {
    String partitionId;
    String consumerGroupName;
    String eventHubName;
}

Partition Processor

interface PartitionProcessor {

    /**
     * Called before Event Processor starts processing a new partition
     */
    Mono<Void> initialize();

    /**
     * Called with Event Processor is stopped or if the 
     * ownership of this partition is lost
     */
    Mono<Void> close();

    /**
     * The events will be processed in this method when they arrive.
     */
    Mono<Void> processEvents(Flux<EventData> eventData);

    /**
     * Called when there is an error in the underlying receiver
     */
    Mono<Void> processError(Throwable throwable);
}

Checkpoint manager

public class CheckpointManager {
    private PartitionContext partitionContext;
    
    // The update checkpoint methods in this class will forward the request to 
    // underlying partition manager
    private PartitionManager partitionManager;

    /**
     * Updates a checkpoint using the event data
     */
    public Mono<Void> updateCheckpoint(EventData eventData){
        return Mono.empty();
    }

    /**
     * Updates a checkpoint using the given offset and sequence number
     */
    public Mono<Void> updateCheckpoint(long sequenceNumber, long offset){
        return Mono.empty();
    }
}

Partition manager

// Passed into the EventProcessorHost to manage partition ownership and checkpoint creation.
interface PartitionManager {

    /**
     * Called to get the list of all existing partition ownership from the underlying data store.
     * Could return empty results if there are is no existing ownership information.
     */
    Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName);

    /**
     * Called to claim ownership of a list of partitions. This will return the list of 
     * partitions that were owned successfully. 
     * @param requestedPartitionOwnerships
     * @return
     */
    Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships);

    /**
     * Updates the checkpoint in the data store for a partition 
     */
    Mono<Void> updateCheckpoint(Checkpoint checkpoint);
}

Partition ownership

// used by PartitionManager to claim ownership.
// returned by listOwnership
class PartitionOwnership {
    String eventHubName;
    String consumerGroupName;
    String instanceId;
    String partitionId;
    long ownerLevel;
    Long sequenceNumber; // optional
    Long offset; // optional
    Long lastModifiedTime; // optional
    String eTag; // optional
}

Checkpoint

// used by updateCheckpoint in PartitionManager 
public class Checkpoint {
    private String eventHubName;
    private String consumerGroupName;
    private String instanceId;
    private String partitionId;
    private long sequenceNumber;
    private long offsetNumber;
}

Sample customer facing implementation

/**
 * Sample code to demonstrate how a customer might use {@link EventProcessor}
 */
public class EventProcessorSample {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessorSample.class);

    public static void main(String[] args) {
        EventProcessor eventProcessor = new EventProcessorBuilder()
            .connectionString("")
            .partitionProcessorFactory(ConsolePartitionProcessor::new)
            .buildEventProcessor();

        LOGGER.info("Starting event processor");
        eventProcessor
            .start()
            .subscribe();

        LOGGER.info("Event processor started");
        // do stuff
        LOGGER.info("Stopping event processor");
        eventProcessor.stop().block();
        LOGGER.info("Stopped event processor");
    }
}


/**
 * A simple implementation of a {@link PartitionProcessor} that logs the 
 * methods called in this implementation to a console
 */
public class ConsolePartitionProcessor implements PartitionProcessor {

    private Logger logger = LoggerFactory.getLogger(ConsolePartitionProcessor.class);

    private PartitionContext partitionContext;
    private CheckpointManager checkpointManager;

    public ConsolePartitionProcessor(PartitionContext partitionContext,
        CheckpointManager checkpointManager) {
        this.partitionContext = partitionContext;
        this.checkpointManager = checkpointManager;
    }

    @Override
    public Mono<Void> initialize() {
        logger.info("Initializing partition processor: event hub name = " + partitionContext
            .getEventHubName() + "; consumer group name = " + partitionContext
            .getConsumerGroupName() + "; partition id = " + partitionContext.getPartitionId());

        return Mono.empty();
    }

    @Override
    public Mono<Void> close() {
        logger.info(
            "Closing partition processor event hub name = " + partitionContext.getEventHubName()
                + "; consumer group name = " + partitionContext.getConsumerGroupName()
                + "; partition id = " + partitionContext.getPartitionId());
        return Mono.empty();
    }

    @Override
    public Mono<Void> processEvents(Flux<EventData> eventData) {
        logger.info("Processing events ");
        eventData.subscribe(this::process);
        return Mono.empty();
    }

    @Override
    public Mono<Void> processError(Throwable throwable) {
        logger.warn("Error while processing partition");
        return Mono.empty();
    }

    private void process(EventData event) {
        logger.info("Processing event with sequence number " + event.sequenceNumber());
    }
}

/**
 * A simple in-memory implementation of a {@link PartitionManager}
 */
public class InMemoryPartitionManager implements PartitionManager {

    private Map<String, PartitionOwnership> partitionOwnershipMap = new ConcurrentHashMap<>();

    @Override
    public Flux<PartitionOwnership> listOwnership(String eventHubName, String consumerGroupName) {
        return Flux.fromStream(partitionOwnershipMap.values().stream());
    }

    @Override
    public Flux<PartitionOwnership> claimOwnership(
        List<PartitionOwnership> requestedPartitionOwnerships) {
        return Flux.fromStream(requestedPartitionOwnerships.stream()
            .filter(partitionOwnership -> !partitionOwnershipMap
                .containsKey(partitionOwnership.getPartitionId()))
            .map(partitionOwnership -> {
                partitionOwnershipMap.put(partitionOwnership.getPartitionId(), partitionOwnership);
                return partitionOwnership;
            }));
    }

    @Override
    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        partitionOwnershipMap.get(checkpoint.getPartitionId())
            .setSequenceNumber(checkpoint.getSequenceNumber());
        partitionOwnershipMap.get(checkpoint.getPartitionId())
            .setOffset(checkpoint.getOffsetNumber());
        return Mono.empty();
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment