Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active January 26, 2022 23:17
Show Gist options
  • Save srnagar/7ef8566cfef0673288275c450dc99590 to your computer and use it in GitHub Desktop.
Save srnagar/7ef8566cfef0673288275c450dc99590 to your computer and use it in GitHub Desktop.
Event Processor Load balancing

Load balancing for Event Processor is based on the number of Event Hub partitions and number of Event Processor instances available. This assumes all Event Processor instances are identical and events from all partitions require same processing and the events are evenly distributed among partitions.

Terminology

Store - refers to any of the underlying mechanisms used to persist partition owernship + checkpoint information. It could be in-memory (for testing purposes), Azure blobs, Azure Tables or any other storage that provides strong consistency. Strong consistency is a requirement for this to work.

Partition id - An int (as string) value ranging from 0 to P-1. Currently, P is immutable once an Event Hub is setup. However, this is going to change in the future.

Owner id - Unique string identifier for an instance of Event Processor.

ETag - Unique identifier that gets generated by the store when ownership entry is updated.

Checkpoint - An integer value to indicate the sequence number of the event of a partition that was last processed successfully. We will use sequence number here but we will store both sequence number and offset.

Storage

For a given Event Hub + consumer group combination, the storage should have at most one entry per partition. So, the primary key is (Event Hub name , consumer group, partition id).

Execution

Do the following when the instance comes up (on start()) and schedule it to run periodically every x seconds.

Each time this is run, the expectation is to claim ownership of at most 1 new partition.

  • Get the number of partitions in this Event Hub - number of partitions may no longer be immutable, so each time this information has to be retrieved. Let the number of partitions be P with partition ids 0 to P-1.
  • Get the list of current partition ownership for this Event Hub, consumer group from the store - each entry in the list will contain the following info:
    • partition id
    • owner id
    • sequence no, (offset?),
    • last modified time
    • owner level
    • ETag
    • Event Hub name
    • consumer group name
  • Depending on the data retrieved from above two steps, we will have one of the following scenarios:

Scenarios

  1. If the partition ownership list is empty and the first instance of Event Processor is starting up (N = 1)

    a. Pick a random partition id between 0 and P-1 where P is the number of partitions

    b. Claim ownership of this partition by sending request to the store

        initial event position (either earliest or user provided)
        owner id
        partition id
        owner level (set to 0 for now, may be used later)
    

    c. On successful claim, start processing events from the partition

    Example

    Number of partitions = 5

    Number of Event Processors = 1

    When the Event Processor is run for the very first time, there is no partition ownership information stored. So, the list is empty. All 5 partitions are available for the the Event Processor. The Event Processor randomly chooses a partitionId (say 3) and starts processing events from this partition.

  2. If the partition ownership list contains N instances of Event Processors already running with each owning one partition, N+1th instance comes online (N = P)

    a. Remove entries not modified for more than y seconds

    b. N unique instances found with each owning one partition

    c. Current instance is not part of the list (this is the N+1th instance)

    d. Nothing to steal

    e. Stay inactive and periodically check if any partitions are available for claiming ownership

    Example

    Number of partitions = 5

    Number of Event Processors = 5

    Let's assume that each Event Processor owns 1 partition.

    Now, if a 6th Event Processor joins the pool. As each partition has it's own Event Processor, adding new processors will not help in scaling. So, the excessive Event Processors will remain inactive until an active Event Processor goes down.

    Let's check the math:

    Min partitions per event processor = 5 / 6 = 0

    Number of Event Processors with one additional partition = 5 % 6 = 5

    i.e. 5 Event Processors will own 1 partition and 1 will own none.

(The above scenario is the same for any number of instances that come online after N running instances each owning a partition)

  1. If the partition ownership list contains N instances of Event Processors actively running with balanced partition ownership, this Event Processor is the N+1th instance starting up (N < P)

    a. Remove entries not modified for more than y seconds from the list

    b. Get a set of all unique instances from the list -> there will be N instances in this case. This instance (N+1)th is not in the set.

    c. Get a count of number of partitions owned by these N instances (none of them will have 2 more than the others) - assuming stable state was reached before N + 1 comes online

    d. (N+1)th instance is looking to steal work from others

    e. With N+1 instances running, each instance should at least own P/(N+1) partitions and none should own more than (P/(N+1)) + 1 partitions. Only P % (N + 1) hosts can own (P/(N+1)) + 1 partitions

    f. So, this instance will steal one (random) partition from any other instance that owns more than (P/(N+1)) + 1 partitions

    g. Claims ownership of this partition

    previously stored ETag is used to make the request to the store (the request will succeed only if the ETag in the request matches the current ETag in the store)
    sequence number is picked from the previous checkpoint from which this partition was stolen.
    owner id
    partition id
    owner level (set to 0 for now, may be used later)
    

    h. On successful claim, start processing the partition

    Example

    Number of partitions = 18

    Number of Event Processors = 3

    Let's assume each event processor is processing 6 partitions and the load is evenly distributed.

    Now, if a 4th Event Processor joins the pool, to balance the load, this new Event Processor has to steal partitions from other 3 partitions. In order to steal a partition, this new Event Processor will look for another Event Processor owning max partitions. In this case, all other Event Processors have the same number of partitions, so, randomly choose any Event Processor to steal a partition. Each run of this algorithm will steal one partition until the load is balanced.

    After the load is balanced, the expected state is:

    Min partitions per event processor = 18 / 4 = 4

    Number of Event Processors with one additional partition = 18 % 4 = 2

    i.e. 2 Event Processors will own 4 partitions and 2 will own 5 partitions.

  2. If the partition ownership list contains N instances of Event Processors with balanced partition ownership but one of the Event Processors goes down i.e. last modified time is too old for all partitions owned by that Event Processor (N < P)

    a. Remove entries not modified for more than y seconds (in this case, all the partitions previously owned by the instance that went down will be filtered)

    b. There are N-1 hosts that are active now.

    c. Every instance should own at least P/(N-1) partitions and none more than (P/(N-1)) + 1. Only P % (N-1) hosts can own (P/(N-1)) + 1 partitions

    d. If the current instance owns less than P/(N-1) partitions (after a host goes down), this must be true. There are unclaimed partitions with no owners. This instance will randomly select one of the unclaimed partitions.

    e. Claim ownership of this partition

    previously stored ETag is used to make the request to the store (the request will succeed only if the ETag in the request matches the current ETag in the store)
    sequence number is picked from the previous checkpoint updated by the previous owner
    owner id
    partition id
    owner level (set to 0 for now, may be used later)
    

    f. On successful claim, start processing the partition

    Example

    Number of partitions = 20

    Number of Event Processors = 4

    Let's assume each Event Processor is processing 5 partitions and the load is evenly distributed.

    Now, if one of the Event Processors goes down, all partitions previously owned by this Event Processor is now available. When each of the remaining Event Processor runs the algorithm, they claim whatever partition is still available until the load is balanced.

    After the load is balanced, the desired state will be:

    Min partitions per event processor = 20 / 3 = 6

    Number of Event Processors with one additional partition = 20 % 3 = 2

    i.e. 1 Event Processor will own 6 partitions and 2 will own 7 partitions.

  3. If the partition ownership list contains N instances of Event Processors running but some partitions are unclaimed - can happen when multiple Event Processors are starting up around the same time and they are gradually claiming ownership of remaining partitions or the Event Hub configuration has changed and new partitions are added (N < P)

    a. Remove entries not modified for more than y seconds from the list

    b. Every instance should own at least P/N partitions and none more than (P/N) + 1. Only P % N hosts can own (P/N) + 1 partitions)

    c. If there are unclaimed partitions and this instance owns <= P/N partitions, it will select one of the unclaimed partitions randomly.

    d. Claim ownership of this partition

    previously stored ETag (if available) or null ETag is used to make the request to the store (the request will succeed only if the ETag in the request matches the current ETag in the store)
    earliest sequence number of this partition (eventhub client will have this information)
    owner id
    partition id
    owner level (set to 0 for now, may be used later)
    

    e. On successful claim, start processing the partition

    Example

    Number of partitions = 20

    Number of Event Processors = 4

    Let's assume each Event Processor is processing 5 partitions and the load is evenly distributed.

    Now, the user decides to scale out their Event Hub by adding 5 new partitions i.e. we now have 25 partitions and 4 Event Processors.

    This is similar to scenario 4 where some of the partitions are unclaimed. When each Event Processor runs this algorithm, it will claim whatever unclaimed partitions are available until the load is balanced.

    After the load is balanced, the desired state will be:

    Min partitions per event processor = 25 / 4 = 6

    Number of Event Processors with one additional partition = 25 % 4 = 1

    i.e. 3 Event Processors will own 6 partitions and 1 will own 7 partitions.

Concurrency

When two instances pick the same partition to claim ownership at the same instance: 1. Both send a request to claim ownership of the partition with previous ETag for that partition (null if there was no entry for this partition before) 2. The store will atomically try to compare and set ownership based on the ETag for that partition- this is a capability the store is expected to provide 3. One of the requests will succeed and will take ownership of the partition 4. The other request will fail and return an error

Updating checkpoint

When an instance wants to update the checkpoint, the store will have to ensure the owner of the partition is the one that updates the checkpoint. If an instance has lost it's ownership and is trying to update checkpoint, the update should fail and return an error. If the update is successful, the last modified time should also be updated.

Finding a partition to claim ownership

From the list retrieved from the store, filter all the expired ownership entries. When there are unclaimed partitions: - Create a set of all partitions that are actively owned - If the set size is not equal to the number of partitions for this eventhub, then there are unclaimed partitions - Create a set of unclaimed partitions (all partitions - partitions actively owned) - Pick a partition from this unclaimed partition

When all partitions are claimed but the load is not balanced and the current instance has capacity to own more partitions - Create a map of instance id and partition count - Find the instances that have more than (P/N) + 1 partitions and collect their partitions into a set - Pick a random partition from this set to claim ownership

Clock skews

What happens when one of the instance's clock is skewed and is ahead of the rest of the pack by a few minutes? This will lead to this instance thinking all other instances are dead and will try and claim ownership of every partition. Jeff - This is probably okay as this is relatively rare and if that happens, there'll be just one instance running. Also, adjusting the expiration time is a trade-off on how quickly we identify a dead processor vs how much tolerance we have for clock skews.

ETag and checkpoint

1. Note down the oldOwner
2. In a while(true) loop:
	a. Update ownership with current ETag
	b. If success - break out of loop
	c. On failure, read the ownership info about this partition and update current ETag to latest ETag
	d. If this partition is now owned by another owner not equal to oldOwner, some other instance beat this instance to it - break out of loop
	e. Go to (a)

Operations the store should support:

1. Get a list of current ownership status
	a. Request - Event Hub name and consumer group name
	b. Response
	List of
	- partitionId
	- owner instance id
	- sequence number, (offset?), 
	- last modified time
	- owner level
	- ETag
2. Claim ownership of a partition
	a. Request
	List of 
	- partitionId
	- owner instance id
	- sequence number
	- owner level (currently not used)
	- ETag
	b. Response
	List of successfully claimed
	- partitionId
	- owner instance id
	- sequence number, (offset?), 
	- last modified time
	- owner level
	- ETag
3. Update checkpoint
	a. Request
	- partitionId
	- owner instance id
	- sequence number
	- Etag
	b. Response
	- updated ETag
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment