Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active April 23, 2020 19:22
Show Gist options
  • Save srnagar/8616fede4d53677201e0ba50512a70f9 to your computer and use it in GitHub Desktop.
Save srnagar/8616fede4d53677201e0ba50512a70f9 to your computer and use it in GitHub Desktop.
Send API for Event Hub Producer Client

Smart send API Options

Event Hub producer client today requires users to create an EventBatch, tryAdd() events to the batch and send the batch when the user is ready or when the next event does not fit into a batch.

While this API is very helpful to ensure we build a batch that would meet the size restrictions of the AMQP link, it is a cumbersome for cases where the customer is confident that their set of events do actually fit within the size restrictions. There has also been feedback that the client should be smart enough to do batching on its own without having the customer do it.

Java Sample

.NET Sample

Python Sample

JavaScript Sample

You can jump to the proposed API section if you want to skip all the options considered.

Option 1

Add a send API that takes a finite collection of events (like an array) to send. This API will then convert the input collection into EventDataBatchs that will then be sent to Event Hubs. If in the middle of the send operation, a batch fails to send (and all of its retries are exhausted), throw an exception that contains error details and the list of all events that could not be sent from the original user-provided collection.

Note that this API will not support an Iterator that can generate an infinite stream of events.

class EventHubProducerAsyncClient {
    Mono<Void> send(List<EventData> events);
    Mono<Void> send(List<EventData> events, SendOptions options);
}

class SendException extends AmqpException {
    // Gets the events that were unable to be sent 
    // (including the ones in the batch that failed).
    List<EventData> getEvents();
}

class SendOptions {
    String partitionKey();
    String partitionId();
    int maxBatchSizeInBytes();
}
User Code
producerClient
	.send(events)
	.subscribe(unused -> {},
            ex -> {
                if(ex instanceof SendException) {
		     SendException sendException = (SendException) ex;
		     List<EventData> failedEvents = sendException.getEvents();
                }
	    });
Concerns
  • The send API has to iterate over the entire collection even if there's a failure sending one of the batches
  • JS - It's not clear to the user that the exception thrown is of a specific type and has the list of failed events.
  • If there are failed events, the user has to repeat the process and the code to do that is not better than the tryAdd() pattern that we have today defeating the purpose of this API
  • If the process shuts down in the middle of sending the user has no information about how many events in the collection was successfully sent and how many were not sent.

Option 2

A small tweak to the above option is to instead return a response. The response can contain the status and if there are errors, the response will have the list of events that were not sent.

class EventHubProducerAsyncClient {
    Mono<SendResponse> send(List<EventData> events);
    Mono<SendResponse> send(List<EventData> events, SendOptions options);
}

class SendResponse {
    // set to true if there was at least one event that was not sent
    boolean isError; 
    
    // Gets the events that were unable to be sent 
    // (including the ones in the batch that failed).
    List<EventData> getFailedEvents();
    Throwable exception;
}
User code
producerClient
	.send(events)
	.subscribe(sendResponse -> {
            if(sendResponse.isError()) {
                List<EventData> failedEvents = sendResponse.getFailedEvents();
            }
        });

This API addresses the type concern from the previous option but all other concerns still hold.

Concerns
  • The send API has to iterate over the entire collection even if there's a failure sending one of the batches
  • If there are failed events, the user has to repeat the process and the code to do that is not better than the tryAdd() pattern that we have today defeating the purpose of this API
  • If the process shuts down in the middle of sending the user has no information about how many events in the collection was successfully sent and how many were not sent.

Option 3

Improving on the previous option, this addresses the concern of not having to iterate over the entire collection if there are failures.

class EventHubProducerAsyncClient {
    Mono<SendResponse> send(List<EventData> events);
    Mono<SendResponse> send(List<EventData> events, SendOptions options);
}

class SendResponse {
    // set to true if there was at least one event that was not sent
    boolean isError; 
    // Instead of returning the failed events in a list, return the count of 
    // successfully sent events. User can resume sending from the 
    // next index of the original collection.
    long successfullySentEventCount;
    Throwable exception;
}
Concerns
  • If there are failed events, the user has to repeat the process and the code to do that is not better than the tryAdd() pattern that we have today defeating the purpose of this API
  • If the process shuts down in the middle of sending the user has no information about how many events in the collection was successfully sent and how many were not sent.

Option 4

Instead of internally managing the creation of batches and sending, another option is to still keep both these operations separate but make it a little easier for the user to create batches and send. In this option, we still take a finite collection of events and return a response containing a list of batches. The user can then use this list of batches to perform the send operation themselves as they would do today.

class EventHubProducerAsyncClient {
    CreateBatchResponse createBatches(List<EventData> events);
}

class CreateBatchResponse {
    List<EventDataBatch> eventDataBatches;
    List<EventData> invalidEvents;
}
Concerns
  • Very little value add with this API and users who do not understand the details will not like it
  • Still a 2-step process to send events

Option 4.1

This is a slight variation of option 4. Instead of returning a new type CreateBatchResponse which includes valid event batches and invalid events, in this option, we return a list of event batches and if there are any invalid events in the input list, an exception is thrown.

class EventHubProducerAsyncClient {
    List<EventDataBatch> createBatches(List<EventData> events) throws CreateBatchException
}

class CreateBatchException extends AmqpException {
    String message;
    int failedEventIndex;
}

User code

producerClient.createBatch(events).stream().forEach(producerClient::send);

Option 5

Add a send API that takes one event at a time and a callback that will be invoked when the event is either successfully sent or if there's a failure sending the event. Internally, the SDK will buffer and send batches to efficiently send events and not make a network call to send every event.

This API is similar to the Kafka Producer

class EventHubProducerAsyncClient {
    Mono<Void> send(EventData event, Consumer<SendResponse> onCompletion)
}

class SendResponse {
    boolean isError; 
    Throwable throwable;
    EventData event; // for user to know which event failed
}
Concerns
  • This needs to consider a variety of things

    • Buffer size
    • Duration to wait
    • Failure handling
    • Order of events sent
    • Ability to flush events
    • If process shuts down, users will not know which events were sent and what was still in buffer
  • This is an advanced scenario and should be only added if we have a strong user requirement and we have thought about all the scenarios

Option 6

In this option, we add 2 new APIs - send() and createBatches(). The send() API takes an iterable of events that will be sent in a single batch. If the events in the iterable exceed the batch limit, an exception is thrown. The createBatches() API is similar to option 4.1 above. It takes a finite list of events as input and returns a list of event batches. If any event in the list is too big for a batch or is corrupt in any other way, an exception will be thrown.

class EventHubProducerAsyncClient {
    Mono<Void> send(Iterable<EventData> events) throws SendException
    Flux<EventDataBatch> createBatches(List<EventData> events) throws CreateBatchException
}

class SendException extends AmqpException {
    String message;
}

class CreateBatchException extends AmqpException {
    String message;
    int failedEventIndex;
}

Option 7

In this option, the createBatches API will take a stream of events and return a stream of events. This API will additionally include a max wait time after which a batch is yielded that may not be full. If there are events that are corrupt or too big to fit into a batch, this will yield the batch created so far before the poison event and then follow it up with an exception.

class EventHubProducerAsyncClient {
    Flux<EventDataBatch> createBatches(Flux<EventData> events, Duration maxWaitTime);
}

Option 8

In this option, the createBatches API will take a finite collection of events and return a list of batches. This API can additionally include an option to select a strategy for batching like batching events in order or batching events with optimal space utilization etc.

class EventHubProducerAsyncClient {
    Mono<Void> send(Iterable<EventData> events) throws SendException
    List<EventDataBatch> createBatches(List<EventData> events, CreateBatchOptions createBatchOptions);
}

class CreateBatchOptions {
	BatchingStrategy batchingStrategy;
	// other existing options like partition key, partition id
}

class SendException extends AmqpException {
    String message;
}

Other suggestions

  • Provide a send API with different batching strategies
  • Each application has different requirements on sending and the primitive we have today allows users to write convenient APIs on top that are suitable for their application

Proposed API

We realized that each alternative discussed above resulted in us foregoing one section of the user base in favor of the other and had us making assumptions regarding the customer use cases. Therefore, we chose to rather provide the right API primitives to the customer and trust that they know their application best.

If they want to ensure that their batch is under the size limit, then they can use createBatch(). If they are confident they wont increase the limit, they can use send(..)

class EventHubProducerAsyncClient {
    Mono<Void> send(Iterable<EventData> events) throws AmqpException;
    Mono<Void> send(Iterable<EventData> events, SendOptions sendOptions) throws AmqpException
}

User Code

Using a synchronous producer client
// Sending a list of events to partitions in round-robin
public void sendEvents(List<EventData> events) {
	EventHubProducerClient producerClient = new EventHubClientBuilder()
		.connectionString("<connection-string>")
		.buildProducerClient();
	
	producerClient.send(events);
}

// Sending a list of events to a specific partition id
public void sendEvents(List<EventData> events) {
	EventHubProducerClient producerClient = new EventHubClientBuilder()
		.connectionString("<connection-string>")
		.buildProducerClient();
	
	SendOptions sendOptions = new SendOptions().paritionId("1");
	producerClient.send(events, sendOptions);
}

// Sending a list of events with a partition key
public void sendEvents(List<EventData> events) {
	EventHubProducerClient producerClient = new EventHubClientBuilder()
		.connectionString("<connection-string>")
		.buildProducerClient();
	
	SendOptions sendOptions = new SendOptions().paritionKey("my-partition-key");
	producerClient.send(events, sendOptions);
}
Using an asynchronous producer client
// Sending a list of events to partitions in round-robin
public void sendEventsAsync(List<EventData> events) {
	EventHubProducerAsyncClient asyncProducerClient = new EventHubClientBuilder()
		.connectionString("<connection-string>")
		.buildAsyncProducerClient();
	
	asyncProducerClient
	.send(events)
	.subscribe(ignore -> {},
		 ex -> System.out.println("Sending failed " + ex.getMessage()
		 () -> System.out.println("Sending events completed successfully");
		 
	// wait until the async send operation is complete before exiting the process
}

// Sending a list of events to a specific partition id
public void sendEventsAsync(List<EventData> events) {
	EventHubProducerAsyncClient asyncProducerClient = new EventHubClientBuilder()
		.connectionString("<connection-string>")
		.buildAsyncProducerClient();
	
	SendOptions sendOptions = new SendOptions().paritionId("1");
	asyncProducerClient
	.send(events, sendOptions)
	.subscribe(ignore -> {},
		 ex -> System.out.println("Sending failed " + ex.getMessage()
		 () -> System.out.println("Sending events completed successfully");
	
	// wait until the async send operation is complete before exiting the process
}

// Sending a list of events with a partition key
public void sendEventsAsync(List<EventData> events) {
	EventHubProducerAsyncClient asyncProducerClient = new EventHubClientBuilder()
		.connectionString("<connection-string>")
		.buildAsyncProducerClient();
	
	SendOptions sendOptions = new SendOptions().paritionKey("my-partition-key");
	asyncProducerClient
	.send(events, sendOptions)
	.subscribe(ignore -> {},
		 ex -> System.out.println("Sending failed " + ex.getMessage()
		 () -> System.out.println("Sending events completed successfully");
}
@YijunXieMS
Copy link

YijunXieMS commented Apr 7, 2020

Python of Final proposed API

API

class EventHubProducerClient:
    async def send_batch(Union[List[EventData], EventDataBatch], *, partition_key=None, partition_id=None, timeout=None) -> None:
    '''
    When EventDataBatch is passed in, partition_key and partition_id are not allowed because EventDataBatch might already has properties.
    raises: ValueError, EventHubError.
    '''

Sample

try:
    producer_client.send_batch(events, partition_id="0")
except ValueError:
    # exceeding size
except EventHubError:
    # other errors like connection to the event hub.

@chradek
Copy link

chradek commented Apr 8, 2020

JavaScript API

Sending Events

The only change is that the existing sendBatch API now accepts either an EventDataBatch or an array of EventData as the first parameter, as opposed to just an EventDataBatch.

class EventHubProducerClient {
  async sendBatch(batch: EventDataBatch | EventData[], options?: SendBatchOptions): Promise<void>;
}

@jsquire
Copy link

jsquire commented Apr 14, 2020

.NET API (option 6)

public class EventHubProducerClient
{
    public virtual async Task SendAsync(IEnumerable<EventData> events, CancellationToken cancellationToken = default);
    public virtual async Task SendAsync(IEnumerable<EventData> events, SendEventOptions options, CancellationToken cancellationToken = default);

    // NOTE: ValueTask used for consistency with CreateBatchAsync; we may want to discuss making it Task.
    public virtual async ValueTask<IEnumerable<EventDataBatch>> CreateBatchesAsync(IReadOnlyList<EventData> events, CancellationToken cancellationToken = default);
    public virtual async ValueTask<IEnumerable<EventDataBatch>> CreateBatchesAsync(IReadOnlyList<EventData> events, CreateBatchOptions options, CancellationToken cancellationToken = default);
}

public class BatchCreationException : EventHubsException
{
    // Where in the sequence of events used to create the batch was the failure?
    public int FailedEventIndex { get; }
    
    // Inherited from EventHubsException; included for context.    
    public bool IsTransient { get; } = false;
    public string EventHubName { get; }
    public FailureReason Reason { get; }  // ( FailureReason.MessageSizeExceeded || FailreReason.GeneralError)
    public override string Message { get; }  // (Too large for batch or check inner exception for details)
}

@jsquire
Copy link

jsquire commented Apr 14, 2020

class EventBatchSizeExceededError extends Error {
failedEventIndex: number;
}

function isEventBatchSizeExceededError(err: any): err is EventBatchSizeExceededError;

@chradek: What happens if creating the batch fails because an individual event couldn't be serialized to the necessary AMQP format? We'd probably want to surface that scenario the same way as a message being too large, since both are poison events that would never be able to be added to the batch. It may be worth considering a less specific name here.

@YijunXieMS: Same question with respect to the EventHubSendOversizeException in your sample.

@YijunXieMS
Copy link

Together with the new create_batches, we better have a method on EventDataBatch to reference back to what events are included in a batch and open this to users.
After create_batches, they send the batches out one by one. After a batch is sent, they might want to reference back what events are sent out.

@srnagar
Copy link
Author

srnagar commented Apr 21, 2020

Proposed .NET API:

public class EventHubProducerClient
{
    public virtual async Task SendAsync(IEnumerable<EventData> events, CancellationToken cancellationToken = default);
    public virtual async Task SendAsync(IEnumerable<EventData> events, SendEventOptions options, CancellationToken cancellationToken = default);
}

@srnagar
Copy link
Author

srnagar commented Apr 23, 2020

.NET user code samples:

Send events

IEnumerable<EventData> events = Enumerable
    .Range(0, 25)
    .Select(index => new EventData(Encoding.UTF8.GetBytes(new string('X', index + 5))));
var connectionString = BuildConnectionString();
await using (var producer = new EventHubProducerClient(connectionString))
{
    await producer.SendAsync(events);
}

Send events with a partition key

IEnumerable<EventData> events = Enumerable
    .Range(0, 25)
    .Select(index => new EventData(Encoding.UTF8.GetBytes(new string('X', index + 5))));
var connectionString = BuildConnectionString();
await using (var producer = new EventHubProducerClient(connectionString))
{
    var sendOptions = new SendEventOptions { PartitionKey = "some123key-!d" };
    await producer.SendAsync(events, sendOptions);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment