Key changes:
- Creating an instance Event Processor will have similar constructor overloads that Event Hub Client has with two additional parameters:
- Consumer group name
- Partition Processor factory
- Start() has two overloads and one of them takes PartitionManager. This helps in:
- Getting started
- Users with few partitions and don't want load balancing
public class Sample {
public static void main(String[] args) {
EventProcessor eventProcessor = new EventProcessorBuilder()
.connectionString("")
.consumerGroup("")
.partitionProcessorFactory(SimplePartitionProcessor::new)
.build();
// Starts the event processor with no checkpointing or load balancing
eventProcessor.start();
eventProcessor.stop();
// Starts the event processor with no checkpointing or load balancing but with an error handler
eventProcessor.start(errorHandler);
eventProcessor.stop();
// Create an instance of blobPartitionManager
PartitionManager partitionManager = new BlobPartitionManager(containerClient);
// Starts the event processor that uses blobs for checkpointing and load balancing
eventProcessor.start(partitionManager)
eventProcessor.stop();
// Starts the event processor that uses blobs for checkpointing and load balancing with error handling
eventProcessor.start(partitionManager, errorHandler)
eventProcessor.stop();
}
}
/**
* User extends the base abstract class to implement processEvent and optionally
* override other methods.
*/
public class SimplePartitionProcessor extends PartitionProcessor {
// We can remove partition context and send individual
// properties (Event Hub name, namespace, partitionId, consumerGroup)
public SimplePartitionProcessor(PartitionContext partitionContext) {
super(partitionContext);
}
@Override
public void processEvent(EventData eventData) {
// process event
this.updateCheckpoint(eventData);
}
}
// User class extending the base class provided by the library
class SamplePartitionProcessor extends PartitionProcessor {
async processEvents(events) { console.log(events);}
async processError(error) { console.log(error); }
async initialize() { console.log(`Started processing partition: ${this.partitionId}`); }
async close(reason) { console.log(`Stopped processing partition: ${this.partitionId} for reason ${reason}`); }
}
// 3 constructor overloads similar to EventHubClient, extra arguments are consumerGroupName, and PartitionProcessor
// Options are not shown, but are similar to what EventHubClientOptions support + max batch size, max wait time per batch
myProcessor = new EventProcessor(connectionstring, consumerGroupName, SamplePartitionProcessor);
myProcessor = new EventProcessor(connectionstring, eventhubName, consumerGroupName, SamplePartitionProcessor);
myProcessor = new EventProcessor(fullyQualifiedNamespace, eventhubName, credentials, consumerGroupName, SamplePartitionProcessor);
// And/Or use the EventHubClient as we have proof that adding more connections doesnt help JS much
myProcessor = new EventProcessor(eventHubclient, consumerGroupName, SamplePartitionProcessor)
// Start without partition manager, user attempt to checkpoint will fail
myProcessor.start();
// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail
myProcessor.start(errorHandler)
// Start by passing partition manager that uses blobs for checkpointing & load balancing
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myProcessor.start(myPartitionManager);
// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing
myProcessor.start(myPartitionManager, errorHandler);
// Stop the event processor
myProcessor.stop();
Key changes:
-
Create Event Processor from Event Hub Client. The constructor of Event Hub Client has all parameters to create a connection which can be passed along to Event Processor constructor to create connections when required.
-
The client becomes the first interaction point for the user which can create instances of EventHubProducer, EventHubConsumer or EventProcessor for sending and receiving
public class Sample {
public static void main(String[] args) {
EventHubAsyncClient eventHubAsyncClient = new EventHubClientBuilder()
.connectionString("")
.build();
EventProcessor eventProcessor = eventHubAsyncClient.createEventProcessor(consumerGroup, SimplePartitionProcessor::new);
// Starts receiving events with no checkpointing or load balancing
eventProcessor.start();
eventProcessor.stop();
}
// Starts the event processor with no checkpointing or load balancing but with an error handler
eventProcessor.start(errorHandler);
eventProcessor.stop();
// Create an instance of blobPartitionManager
PartitionManager partitionManager = new BlobPartitionManager(containerClient);
// Starts the event processor that uses blobs for checkpointing and load balancing
eventProcessor.start(partitionManager)
eventProcessor.stop();
// Starts the event processor that uses blobs for checkpointing and load balancing with error handling
eventProcessor.start(partitionManager, errorHandler)
eventProcessor.stop();
}
/**
* User extends the base abstract class to implement processEvent and optionally
* override other methods.
*/
public class SimplePartitionProcessor extends PartitionProcessor {
public SimplePartitionProcessor(PartitionContext partitionContext) {
super(partitionContext);
}
@Override
public void processEvent(EventData eventData) {
// process event
this.updateCheckpoint(eventData);
}
}
// User class extending the base class provided by the library
class SamplePartitionProcessor extends PartitionProcessor {
async processEvents(events) { console.log(events);}
async processError(error) { console.log(error); }
async initialize() { console.log(`Started processing partition: ${this.partitionId}`); }
async close(reason) { console.log(`Stopped processing partition: ${this.partitionId} for reason ${reason}`); }
}
myClient = new EventHubClient(connectionstring);
myClient = new EventHubClient(connectionstring, eventhubName);
myClient = new EventHubClient(fullyQualifiedNamespace, eventhubName, credentials);
myProcessor = myClient.createEventProcessor(consumerGroup, SimplePartitionProcessor)
// Start without partition manager, user attempt to checkpoint will fail
myProcessor.start();
// Start without partition manager, but with top level error handler, user attempt to checkpoint will fail
myProcessor.start(errorHandler)
// Start by passing partition manager that uses blobs for checkpointing & load balancing
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myProcessor.start(myPartitionManager);
// Start by passing error handler & partition manager that uses blobs for checkpointing & load balancing
myProcessor.start(myPartitionManager, errorHandler);
// Stop the event processor
myProcessor.stop();
Key changes:
- No Event Processor
- Provide
send()
andreceive()
options on the client to send and receive events to/from all partitions - For users who want to work with a specific partition, change
createProducer
andcreateConsumer
methods tocreatePartitionProducer
andcreatePartitionConsumer
public class Sample {
public static void main(String[] args) {
EventHubAsyncClient eventHubAsyncClient = new EventHubClientBuilder()
.connectionString("")
.build();
// Starts receiving events with no checkpointing and load balancing
Disposable disposable = eventHubAsyncClient.receive(consumerGroup, SimplePartitionProcessor::new, options);
disposable.dispose(); // to stop receiving
// Starts receiving events with checkpointing and load balancing
Disposable disposable = eventHubAsyncClient.receive(consumerGroup, SimplePartitionProcessor::new, partitionManager, options)
disposable.dispose();
}
}
/**
* User extends the base abstract class to implement processEvent and optionally
* override other methods.
*/
public class SimplePartitionProcessor extends PartitionProcessor {
public SimplePartitionProcessor(PartitionContext partitionContext) {
super(partitionContext);
}
@Override
public void processEvent(EventData eventData) {
// process event
this.updateCheckpoint(eventData);
}
}
// 3 constructor overloads for EventHubClient, options not shown
myClient = new EventHubClient(connectionstring);
myClient = new EventHubClient(connectionstring, eventhubName);
myClient = new EventHubClient(fullyQualifiedNamespace, eventhubName, credentials);
// Send without setting partitionId
await myClient.send(myBatch)
// Send by setting partitionId
myPartitionProducer = myClient.createPartitionProducer(partitionId);
await myPartitionProducer.send(myBatch)
// Receive without caring about partitionId.
// This spins up the EPH and starts receiving without checkpointing or load balancing support
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor);
await myReceiver.stop();
// Receive without caring about partitionId.
// This spins up the EPH and starts receiving with checkpointing or load balancing support
containerClient = new ContainerClient("storage-connection-string", "container-name");
myPartitionManager = new BlobPartitionManager(containerClient);
myReceiver = myClient.receive(consumerGroupName, SamplePartitionProcessor, myPartitionManager);
await myReceiver.stop();
// Receive from a particular partitionId.
myPartitionConsumer = myClient.createPartitionConsumer(consumerGroupName, partitionId, position, options);
myEvents = await myPartitionConsumer.receiveBatch(10);
Creation of an Event Processor will be similar to Event Hub Client i.e. follows the builder pattern.
public class EventProcessorBuilder() {
// The following methods are available on EventHubClientBuilder
public EventProcessorBuilder connectionString(String connectionString);
public EventProcessorBuilder connectionString(String connectionString, String eventHubName);
public EventProcessorBuilder transportType(TransportType transportType);
public EventProcessorBuilder credential(String namespace, String eventHubName, TokenCredential credential);
public EventProcessorBuilder configuration(Configuration configuration);
public EventProcessorBuilder proxyConfiguration(ProxyConfiguration proxyConfiguration);
public EventProcessorBuilder retry(RetryOptions retryOptions);
public EventProcessorBuilder scheduler(Scheduler scheduler);
// New methods added for Event Processor
public EventProcessorBuilder consumerGroup(String); // also required for creating an EventHubConsumer
public EventProcessorBuilder partitionProcessorFactory(Function<PartitionContext, PartitionProcessor> partitionProcessorFactory);
// build the processor from available params or throw error if required params are missing
public EventProcessor build();
}
public class EventProcessor {
EventProcessor(constructor args); // package private
public start();
public start(PartitionManager partitionManager);
public stop();
}
public abstract class PartitionProcessor {
public PartitionProcessor(PartitionContext partitionContext);
public void initialize();
public abstract void processEvent(EventData eventData);
public void processError(Exception error);
public void close(CloseReason closeReason);
public final void updateCheckpoint(EventData eventData); // cannot override
}
I know we had decided not to use partition context but it simplifies the constructor and easy to create a factory and also use it for getting checkpoint information from store.
public class PartitionContext {
public String getPartitionId();
public String getEventHubName();
public String getNamespace();
public String getConsumerGroup();
}
public class EventHubClient {
// Existing methods
public EventHubProducer createProducer();
public EventHubProducer createProducer(EventHubProducerOptions eventHubProducerOptions);
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition);
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition, EventHubConsumerOptions evnetHubConsumerOptions);
// New methods
public EventProcessor createEventProcessor(consumerGroup, partitionProcessorFactory, options); // creates a simple processor without load balancing and checkpointing
public EventProcessor createEventProcessor(consumerGroup, partitionProcessorFactory, partitionManager, options); creates a processor with load balancing and checkpoint
}
public class EventHubAsyncClient {
// constructor
// Send without setting the partition
public Mono<Void> send(Flux<EventData> events);
// Receive without setting the partition which is equivalent to event processor's start()
public Disposable receive(String consumerGroup,
Function<PartitionContext, PartitionProcessor> partitionProcessorFactory,
EventProcessorOptions options);
public Disposable receive(String consumerGroup,
Function<PartitionContext, PartitionProcessor> partitionProcessorFactory,
PartitionManager partitionManager,
EventProcessorOptions options);
// Partition specific producer
public EventHubProducer createProducer(String partitionId);
// Partition specific consumer
public EventHubConsumer createConsumer(String consumerGroup, String partitionId, EventPosition eventPosition);
// Other methods
public Mono<EventHubProperties> getProperties();
public Mono<PartitionProperties> getPartitionProperties();
public Flux<String> getPartitionIds();
}