Skip to content

Instantly share code, notes, and snippets.

@ramya-rao-a
Last active October 10, 2019 01:25
Show Gist options
  • Save ramya-rao-a/a54d8afb5b5cf292a250e58f42789a44 to your computer and use it in GitHub Desktop.
Save ramya-rao-a/a54d8afb5b5cf292a250e58f42789a44 to your computer and use it in GitHub Desktop.
Event Hubs library proposal where sender and receiver have their own clients
/*
Key changes:
- A dedicated "client" for EPH, making this a EventHubConsumerClient that customers would first gravitate towards
- start() & stop() would be top level methods
- In .Net, there would be 4 settable callbacks. These would be in the builder for Java.
- For Python & JS, these would be passable to the start() method
- State management is done via the PartitionContext that gets passed to the callbacks.
- .Net & Java will have to allow users to extend the base class to store the state
- This results in a dedicated client for send for symmetry, and so EventHubProducerClient
- send() & createBatch() would be top level methods, no need to create producers and maintain them
- need to check with service if there is ever a need for users to maintain each producer link
- Low level consumer is not off of any client. This is a stand-alone class.
- In Python, .Net & Java, this would be under sub namespaces/modules
- All 3 entry points above would have constructor overloads such that they support
- connection string
- connection string, event hub name
- fully qualified namespace, event hub name, credentials
- Advanced: New object "EventHubConnection" when wanting to have clients or low level consumers share a connection
*/
// ================================= Send start ===========================================
// Send events
function sendToAny() {
const producerClient = new EventHubProducerClient(connectionString);
const myBatch = await producerClient.createBatch();
// add code to populate the batch
await producerClient.sendBatch(myBatch);
}
// Send events using partitionKey
function sendWithPartitionKey() {
const producerClient = new EventHubProducerClient(connectionString);
const batchOptions = {
partitionKey: "Apples!"
maxSizeInBytes: 1000 // If partitionId is also set, then throw error when creating batch
}
const myBatch = await producerClient.createBatch(batchOptions);
// add code to populate the batch
await producerClient.sendBatch(myBatch, sendOptions);
}
// Send events targeting specific partition
function sendToPartition() {
const producerClient = new EventHubProducerClient(connectionString);
const batchOptions = {
partitionId: "2" // If partitionKey is also set, then throw error when creating batch
}
// creates a new amqp link if this is the first time we see this partitionId, else re-uses link
const myBatch = await producerClient.createBatch(batchOptions);
await producerClient.sendBatch(myBatch, sendOptions);
}
// Send events to same partition, but dont re-use AMQP link, but use the same connection
// This mimics Track 1's creation of multiple producers from the same client
// Need to check with service if this is a case we should care about.
// Regardless, this is an advanced case, can be an add on, need not be in GA
function sendWithDifferentLinksameConnection() {
const connection = new EventHubConnection(connectionString);
const producerClient1 = new EventHubProducerClient(connection);
const producerClient2 = new EventHubProducerClient(connection);
}
// ================================= Receive start ===========================================
// Receive events
function receiveAll() {
const consumerClient = new EventHubConsumerClient(connectionString, consumerGroupName);
const processEvents = (events, partitionContext) => {
console.log(`Received ${events.length} events from partition with id ${partitionContext.partitionId}`);
console.log(`I can set state on it as well!!`);
partitionContext.state = "hello!";
}
consumerClient.start(processEvents);
}
// Receive events, add error callbacks, initialize
function receiveAllWithAllCallbacks() {
const consumerClient = new EventHubConsumerClient(connectionString, consumerGroupName);
consumerClient.start(processEvents, onError);
// Or
consumerClient.start({processEvents, onError, onPartitionInitialize, onPartitionClose});
}
// Receive events with checkpointing
function receiveAllWithCheckpoints() {
const consumerClient = new EventHubConsumerClient(connectionString, consumerGroupName, checkpointManager);
const processEvents = (events, partitionContext) => {
// Many ways to checkpoint
partitionContext.updateCheckpoint(event); // The given event is checkpointed
partitionContext.updateCheckpoint(event.offset); // The given offset is checkpointed
checkpointManager.updateCheckpoint(all-the-info-that-the-checkpoint-manager-needs-to-update-checkpoint); // Custom checkpointing!!
}
consumerClient.start(processEvents);
}
// ================================= Low level consumer ===========================================
// receive from given partition
function receive() {
const consumer = new EventHubPartitionConsumer(connectionString, consumerGroupName, partitionId, eventPosition);
const events = await consumer.receive(10);
}
// Share connection among consumers for different consumers
// This mimics Track 1's creation of multiple consumers from the same client
// Regardless, this is an advanced case, can be an add on, need not be in GA
function receive() {
const connection = new EventHubConnection(connectionString);
const consumer1 = new EventHubPartitionConsumer(connection, consumerGroupName, "1", eventPosition);
const consumer2 = new EventHubPartitionConsumer(connection, consumerGroupName, "2", eventPosition);
}
@srnagar
Copy link

srnagar commented Oct 3, 2019

Equivalent design for Java

    // ---------------------Send Operations----------------------------------
    /* Send events without specifying partition */
    private static void send(List<EventData> eventDataList) {
        EventHubProducerClient producerClient = new EventHubClientBuilder()
            .connectionString("")
            .buildProducerClient();
        producerClient.send(eventDataList);
    }

    /* Send events using a partition key - not a specific partition id */
    private static void sendWithPartitionKey(List<EventData> eventDataList) {
        EventHubProducerClient producerClient = new EventHubClientBuilder()
            .connectionString("")
            .buildProducerClient();
        SendOptions sendOptions = new SendOptions().setPartitionKey("partitionKey");
        producerClient.send(eventDataList, sendOptions);
    }

    /* Send events to a specific partition id */
    private static void sendToPartition(List<EventData> eventDataList) {
        EventHubProducerClient producerClient = new EventHubClientBuilder()
            .connectionString("")
            .buildProducerClient();
        SendOptions sendOptions = new SendOptions().setPartitionId("1");
        producerClient.send(eventDataList, sendOptions);
    }

    /*
       Send events to same partition, but dont re-use AMQP link, but use the same connection
       This mimics Track 1's creation of multiple producers from the same client
       Need to check with service if this is a case we should care about.
       Regardless, this is an advanced case, can be an add on, need not be in GA
   */
    private static void sendToPartitionSeparateLinks(List<EventData> eventDataList) {
        EventHubConnection eventHubConnection = new EventHubConnection(connectionString);
        EventHubClientBuilder clientBuilder = new EventHubClientBuilder().connection(eventHubConnection);
        EventHubProducerClient producerClient1 = clientBuilder.buildProducerClient();
        EventHubProducerClient producerClient2 = clientBuilder.buildProducerClient();

        SendOptions sendOptions1 = new SendOptions().setPartitionId("1");
        SendOptions sendOptions2 = new SendOptions().setPartitionId("1");
        producerClient1.send(eventDataList, sendOptions1);
        producerClient2.send(eventDataList, sendOptions2);
    }


    // ---------------------Receive Operations----------------------------------
    /* Receive events from all partitions with no load balancing or checkpointing*/
    private static void receiveAll() {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .processEvents((ctx, eventData) -> {
                // process event
                ctx.getPartitionId();
                return Mono.empty();
            })
            .buildConsumerClient();

        consumerClient.start();
        consumerClient.stop();
    }

    /* Receive events from all partitions with load balancing and checkpointing*/
    private static void recieveAll(PartitionManager partitionManager) {
        EventHubConsumerClient consumerClient = new EventHubClientBuilder()
            .connectionString("")
            .consumerGroup("")
            .partitionManager(partitionManager)
            .processEvents((ctx, eventData) -> {
                // process event
                ctx.getPartitionId();
                return ctx.updateCheckpoint(eventData);
            })
            .buildConsumerClient();

        consumerClient.start();
        consumerClient.stop();
    }

    /* Receive events from specific partitions */
    private static void receiveFromPartitions() {
        EventHubConnection eventHubConnection = new EventHubConnection(connectionString);
        EventHubPartitionConsumer eventHubPartitionConsumer1 = new EventHubPartitionConsumer(eventHubConnection,
            consumerGroup, partitionId1, eventPosition);
        EventHubPartitionConsumer eventHubPartitionConsumer2 = new EventHubPartitionConsumer(eventHubConnection,
            consumerGroup, partitionId2, eventPosition);

        eventHubPartitionConsumer1.receive().subscribe(eventData -> processEvents1(eventData));
        eventHubPartitionConsumer2.receive().subscribe(eventData -> processEvents2(eventData));
    }

@srnagar
Copy link

srnagar commented Oct 3, 2019

.NET

class Samples 
{
    //---------------- Send operations -------------------

    // Send events without specifying partition
    public void Send() 
    {
        var eventHubProducerClient = new EventHubProducerClient(connectionString);
        await eventHubProducerClient.SendAsync(eventBatch);
    }


    // Send events using a partition key - not a specific partition id
    public void SendWithPartitionKey() 
    {
        var sendOptions = new SendOptions
        {
            PartitionKey = "Key"
        };
        var eventHubProducerClient = new EventHubProducerClient(connectionString);
        await eventHubProducerClient.SendAsync(eventBatch, sendOptions);
    }

    // Send events to a specific partition id
    public void SendToSpecificPartition() 
    {
        var sendOptions = new SendOptions
        {
            PartitionId = "1"
        };
        var eventHubProducerClient = new EventHubProducerClient(connectionString);
        await eventHubProducerClient.SendAsync(eventBatch, sendOptions);

    }

    /*
       Send events to same partition, but dont re-use AMQP link, but use the same connection
       This mimics Track 1's creation of multiple producers from the same client
       Need to check with service if this is a case we should care about.
       Regardless, this is an advanced case, can be an add on, need not be in GA
   */
    public void SendToPartitionsWithSeparateLinks() 
    {
        var sendOptions = new SendOptions
        {
            PartitionId = "1"
        };

        // create a connection that will be shared by producers
        var connection = new EventHubConnection(connectionString);
        var eventHubProducerClient1 = new EventHubProducerClient(connection);
        var eventHubProducerClient2 = new EventHubProducerClient(connection);

        await eventHubProducerClient1.SendAsync(eventBatch, sendOptions);
        await eventHubProducerClient2.SendAsync(eventBatch, sendOptions);
    }

    //---------------- Receive operations -------------------
    // Receive from all partitions without load balancing or checkpointing
    public void ReceiveAll() 
    {
        var eventHubConsumerClient = new EventHubConsumerClient(connectionString);
        eventHubConsumerClient.Received = async (partitionContext, events) =>
        {
            foreach (EventData e in events) {
                var partitionId = partitionContext.PartitionId;
                // process event
                checkpoint = e.SequenceNumber.Value;
            }
            partitionContext.Checkpoint(checkpoint);
        };
        eventHubConsumerClient.Start();
        eventHubConsumerClient.Stop();
    }

    // Receive from all partitions with load balancing and checkpointing
    public void ReceiveAll(PartitionManager PartitionManager) 
    {
        var eventHubConsumerClient = new EventHubConsumerClient(connectionString, partitionManager);
        eventHubConsumerClient.Received = async (partitionContext, events) =>
        {
            foreach (EventData e in events) {
                var partitionId = partitionContext.PartitionId;
                // process event
                checkpoint = e.SequenceNumber.Value;
            }
            partitionContext.Checkpoint(checkpoint);
        };

        eventHubConsumerClient.Start();
        eventHubConsumerClient.Stop();
    }

    // Receive from all partitions with load balancing, checkpointing and partition state management
    public void ReceiveAll(PartitionManager PartitionManager) 
    {
        var eventHubConsumerClient = new EventHubConsumerClient<MyPartitionContext>(connectionString, partitionManager);
        eventHubConsumerClient.Received = async (myPartitionContext, events) =>
        {
            foreach (EventData e in events) {
                var partitionId = myPartitionContext.Id;
                // process event
                checkpoint = e.SequenceNumber.Value;
            }
            myPartitionContext.Checkpoint(checkpoint);
            myPartitionContext.UpdateState("newState" + checkpoint);
        };

        eventHubConsumerClient.Start();
        eventHubConsumerClient.Stop();
    }

    // Class for managing user state per partition that extends PartitionContext 
    // which has partition information and checkpoint method
    class MyPartitionContext : PartitionContext
    {
        String _myState;
        // maintain state per partition here
        public String UpdateState(String newState)
        {
            _myState = newState;
        }
    }

    // Receive from a specific partition using low level api
    public void ReceivePartition()
    {
        var eventHubPartitionConsumer = new EventHubPartitionConsumer(connectionString, consumerGroup, partitionId1, eventPosition);
        await foreach (EventData currentEvent in eventHubPartitionConsumer.SubscribeToEvents(cancellationSource.Token))
        {
            // process event
            Console.Write("Processed event with sequence number " + currentEvent.SequenceNumber.Value);
        }
    }

    // Receive from specific partitions using same connection and different links
    public void ReceivePartitionSeparateLinks() 
    {
        var connection = new EventHubConnection(connectionString);
        var eventHubPartitionConsumer1 = new EventHubPartitionConsumer(connection, consumerGroup, partitionId1, eventPosition);
        var eventHubPartitionConsumer2 = new EventHubPartitionConsumer(connection, consumerGroup, partitionId2, eventPosition);

        await foreach (EventData currentEvent in eventHubPartitionConsumer1.SubscribeToEvents(cancellationSource.Token))
        {
            // process events from partitionId1 
        }
        
        await foreach (EventData currentEvent in eventHubPartitionConsumer2.SubscribeToEvents(cancellationSource.Token))
        {
            // process events from partitionId2
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment