Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active September 26, 2019 17:53
Show Gist options
  • Save srnagar/935f5d8f942c3eb7f2bf5a983a11de72 to your computer and use it in GitHub Desktop.
Save srnagar/935f5d8f942c3eb7f2bf5a983a11de72 to your computer and use it in GitHub Desktop.
Event Processor API

Event Processor API proposal

Proposal 1

Key changes:

  • Creating an instance Event Processor will have similar constructor overloads that Event Hub Client has with two additional parameters:
    • Consumer group name
    • Partition Processor factory
  • Start() has two overloads and one of them takes PartitionManager. This helps in:
    • Getting started
    • Users with few partitions and don't want load balancing

User experience

Java API sample

public class Sample {
  public static void main(String[] args) {
    EventProcessor eventProcessor = new EventProcessorBuilder()
                                    .connectionString("")
                                    .consumerGroup("")
                                    .partitionProcessorFactory(SimplePartitionProcessor::new)
                                    .build();
    // Starts the event processor with no checkpointing or load balancing
    eventProcessor.start();
    eventProcessor.stop();
      
    // Starts the event processor with no checkpointing or load balancing but with an error handler 
    eventProcessor.start(errorHandler);
    eventProcessor.stop();
    
    // Create an instance of blobPartitionManager
    PartitionManager partitionManager = new BlobPartitionManager(containerClient);
      
    // Starts the event processor that uses blobs for checkpointing and load balancing
    eventProcessor.start(partitionManager)
    eventProcessor.stop();
     
    // Starts the event processor that uses blobs for checkpointing and load balancing with error handling
    eventProcessor.start(partitionManager, errorHandler)
    eventProcessor.stop();
   
  }
}

/**
* User extends the base abstract class to implement processEvent and optionally 
* override other methods.
*/
public class SimplePartitionProcessor extends PartitionProcessor {

  // We can remove partition context and send individual 
  // properties (Event Hub name, namespace, partitionId, consumerGroup)
  public SimplePartitionProcessor(PartitionContext partitionContext) {
    super(partitionContext);
  }

  @Override
  public void processEvent(EventData eventData) {
    // process event
    this.updateCheckpoint(eventData);
  }
}

JavaScript API sample

// User class extending the base class provided by the library
class SamplePartitionProcessor extends PartitionProcessor {
    async processEvents(events) { console.log(events);}
    async processError(error) { console.log(error); }
    async initialize() { console.log(`Started processing partition: ${this.partitionId}`); }
    async close(reason) { console.log(`Stopped processing partition: ${this.partitionId} for reason ${reason}`); }
}

// 3 constructor overloads similar to EventHubClient, extra arguments are consumerGroupName, and PartitionProcessor
// Options are not shown, but are similar to what EventHubClientOptions support + max batch size, max wait time per batch
myProcessor = new EventProcessor(connectionstring, consumerGroupName, SamplePartitionProcessor);
myProcessor = new EventProcessor(connectionstring, eventhubName, consumerGroupName, SamplePartitionProcessor);
myProcessor = new EventProcessor(fullyQualifiedNamespace, eventhubName, credentials, consumerGroupName, SamplePartitionProcessor);

// And/Or use the EventHubClient as we have proof that adding more connections doesnt help JS much
myProcessor = new EventProcessor(eventHubclient, consumerGroupName, SamplePartitionProcessor)

// Start without partition manager, user attempt to checkpoint will fail
myProcessor.start();

// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail
myProcessor.start(errorHandler)

// Start by passing partition manager that uses blobs for checkpointing & load balancing
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myProcessor.start(myPartitionManager);

// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing
myProcessor.start(myPartitionManager, errorHandler);

// Stop the event processor
myProcessor.stop();

Proposal 2

Key changes:

  • Create Event Processor from Event Hub Client. The constructor of Event Hub Client has all parameters to create a connection which can be passed along to Event Processor constructor to create connections when required.

  • The client becomes the first interaction point for the user which can create instances of EventHubProducer, EventHubConsumer or EventProcessor for sending and receiving

User experience

Java

  public class Sample {
    public static void main(String[] args) {
      EventHubAsyncClient eventHubAsyncClient = new EventHubClientBuilder()
                                      .connectionString("")
                                      .build();
        
      EventProcessor eventProcessor = eventHubAsyncClient.createEventProcessor(consumerGroup, SimplePartitionProcessor::new);
      // Starts receiving events with no checkpointing or load balancing
      eventProcessor.start();
      eventProcessor.stop();
    }
      
      // Starts the event processor with no checkpointing or load balancing but with an error handler 
      eventProcessor.start(errorHandler);
      eventProcessor.stop();
      
      // Create an instance of blobPartitionManager
      PartitionManager partitionManager = new BlobPartitionManager(containerClient);
        
      // Starts the event processor that uses blobs for checkpointing and load balancing
      eventProcessor.start(partitionManager)
      eventProcessor.stop();
        
      // Starts the event processor that uses blobs for checkpointing and load balancing with error handling
      eventProcessor.start(partitionManager, errorHandler)
      eventProcessor.stop();
  }
  
  /**
  * User extends the base abstract class to implement processEvent and optionally 
  * override other methods.
  */
  public class SimplePartitionProcessor extends PartitionProcessor {
    public SimplePartitionProcessor(PartitionContext partitionContext) {
      super(partitionContext);
    }
  
    @Override
    public void processEvent(EventData eventData) {
      // process event
      this.updateCheckpoint(eventData);
    }
  }

JavaScript

// User class extending the base class provided by the library
class SamplePartitionProcessor extends PartitionProcessor {
    async processEvents(events) { console.log(events);}
    async processError(error) { console.log(error); }
    async initialize() { console.log(`Started processing partition: ${this.partitionId}`); }
    async close(reason) { console.log(`Stopped processing partition: ${this.partitionId} for reason ${reason}`); }
}

myClient = new EventHubClient(connectionstring);
myClient = new EventHubClient(connectionstring, eventhubName);
myClient = new EventHubClient(fullyQualifiedNamespace, eventhubName, credentials);

myProcessor = myClient.createEventProcessor(consumerGroup, SimplePartitionProcessor)

// Start without partition manager, user attempt to checkpoint will fail
myProcessor.start();

// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail
myProcessor.start(errorHandler)

// Start by passing partition manager that uses blobs for checkpointing & load balancing
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myProcessor.start(myPartitionManager);

// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing
myProcessor.start(myPartitionManager, errorHandler);

// Stop the event processor
myProcessor.stop();

Proposal 3

Key changes:

  • No Event Processor
  • Provide send() and receive() options on the client to send and receive events to/from all partitions
  • For users who want to work with a specific partition, change createProducer and createConsumer methods to createPartitionProducer and createPartitionConsumer

User experience

Java

  public class Sample {
    public static void main(String[] args) {
      EventHubAsyncClient eventHubAsyncClient = new EventHubClientBuilder()
                                      .connectionString("")
                                      .build();
				      
      // Starts receiving events with no checkpointing and load balancing
      Disposable disposable = eventHubAsyncClient.receive(consumerGroup, SimplePartitionProcessor::new, options);
      disposable.dispose(); // to stop receiving
      
       // Starts receiving events with checkpointing and load balancing
      Disposable disposable = eventHubAsyncClient.receive(consumerGroup, SimplePartitionProcessor::new, partitionManager, options)
      disposable.dispose();  
    }
  }
  
  /**
  * User extends the base abstract class to implement processEvent and optionally 
  * override other methods.
  */
  public class SimplePartitionProcessor extends PartitionProcessor {
    public SimplePartitionProcessor(PartitionContext partitionContext) {
      super(partitionContext);
    }
  
    @Override
    public void processEvent(EventData eventData) {
      // process event
      this.updateCheckpoint(eventData);
    }
  }

JavaScript

// 3 constructor overloads for EventHubClient, options not shown
myClient = new EventHubClient(connectionstring);
myClient = new EventHubClient(connectionstring, eventhubName);
myClient = new EventHubClient(fullyQualifiedNamespace, eventhubName, credentials);

// Send without setting partitionId
await myClient.send(myBatch)

// Send by setting partitionId
myPartitionProducer = myClient.createPartitionProducer(partitionId);
await myPartitionProducer.send(myBatch)

// Receive without caring about partitionId. 
// This spins up the EPH and starts receiving without checkpointing or load balancing support
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor);
await myReceiver.stop();

// Receive without caring about partitionId. 
// This spins up the EPH and starts receiving with checkpointing or load balancing support
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor, myPartitionManager);
await myReceiver.stop();

// Receive from a particular partitionId.
myPartitionConsumer = myClient.createPartitionConsumer(consumerGroupName, partitionId, position, options);
myEvents = await myPartitionConsumer.receiveBatch(10);

API types

Creating an Event Processor

Creation of an Event Processor will be similar to Event Hub Client i.e. follows the builder pattern.

public class EventProcessorBuilder() {
  
  // The following methods are available on EventHubClientBuilder
  public EventProcessorBuilder connectionString(String connectionString);
  public EventProcessorBuilder connectionString(String connectionString, String eventHubName);
  public EventProcessorBuilder transportType(TransportType transportType);
  public EventProcessorBuilder credential(String namespace, String eventHubName, TokenCredential credential);
  public EventProcessorBuilder configuration(Configuration configuration);
  public EventProcessorBuilder proxyConfiguration(ProxyConfiguration proxyConfiguration);
  public EventProcessorBuilder retry(RetryOptions retryOptions);
  public EventProcessorBuilder scheduler(Scheduler scheduler);
  
  // New methods added for Event Processor
  public EventProcessorBuilder consumerGroup(String); // also required for creating an EventHubConsumer
  public EventProcessorBuilder partitionProcessorFactory(Function<PartitionContext, PartitionProcessor> partitionProcessorFactory); 
  
  // build the processor from available params or throw error if required params are missing
  public EventProcessor build(); 
}

Event Processor methods

public class EventProcessor {
  EventProcessor(constructor args); // package private
  public start();
  public start(PartitionManager partitionManager);
  public stop();
}

Partition Processor

public abstract class PartitionProcessor {
  public PartitionProcessor(PartitionContext partitionContext);
  public void initialize();
  public abstract void processEvent(EventData eventData);
  public void processError(Exception error);
  public void close(CloseReason closeReason);
  public final void updateCheckpoint(EventData eventData); // cannot override
}

PartitionContext

I know we had decided not to use partition context but it simplifies the constructor and easy to create a factory and also use it for getting checkpoint information from store.

public class PartitionContext {
  public String getPartitionId();
  public String getEventHubName();
  public String getNamespace();
  public String getConsumerGroup();
}

Proposal 2

public class EventHubClient {
	// Existing methods
	public EventHubProducer createProducer();
	public EventHubProducer createProducer(EventHubProducerOptions eventHubProducerOptions);
	public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition);
	public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition, EventHubConsumerOptions evnetHubConsumerOptions);
    	
	// New methods
	public EventProcessor createEventProcessor(consumerGroup, partitionProcessorFactory, options); // creates a simple processor without load balancing and checkpointing
	public EventProcessor createEventProcessor(consumerGroup, partitionProcessorFactory, partitionManager, options); creates a processor with load balancing and checkpoint
}

Proposal 3

public class EventHubAsyncClient {
    // constructor
    
    // Send without setting the partition
    public Mono<Void> send(Flux<EventData> events);
    
    // Receive without setting the partition which is equivalent to event processor's start()
    public Disposable receive(String consumerGroup, 
                              Function<PartitionContext, PartitionProcessor> partitionProcessorFactory, 
                              EventProcessorOptions options);
    public Disposable receive(String consumerGroup, 
                              Function<PartitionContext, PartitionProcessor> partitionProcessorFactory, 
                              PartitionManager partitionManager, 
                              EventProcessorOptions options);
    
    // Partition specific producer
    public EventHubProducer createProducer(String partitionId);
    
    // Partition specific consumer
    public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition);
        
    // Other methods
    public Mono<EventHubProperties> getProperties();
    public Mono<PartitionProperties> getPartitionProperties();
    public Flux<String> getPartitionIds();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment