Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active May 28, 2020 18:32
Show Gist options
  • Save srnagar/366cb66279406db738f8eff82fbfb851 to your computer and use it in GitHub Desktop.
Save srnagar/366cb66279406db738f8eff82fbfb851 to your computer and use it in GitHub Desktop.
Options to claim partitions faster in Event Processor

Problem Statement

In track 2 SDK for Event Processor client, the load balancing loop runs at a fixed interval of 10 seconds. When the load is not balanced among active processors, partitions are claimed/stolen one at a time per load balancing loop. This can take a long time for the processors to reach a balanced state. This document contains a few alternate options to speed up the partition-claiming to reach balanced state sooner.

You can jump to the proposed API section if you want to skip all the options considered.

Options

I have listed a few options below and we could potentially combine two or more of them and also make these configurable load balancing strategies.

Option 1

In each load balancing loop, a processor will try to claim its fair share of partitions all at once.

Pros
  • Theoretically, each individual processor will reach its stable state in just one iteration of load balancing loop
  • Processing for all partitions starts immediately
Cons
  • Partition ownership can jump from one processor to another until all processors come online and reach a steady state
  • If the first node that comes online claims all partitions and starts processing them, it can be overwhelmed by the load and go down immediately
  • Can cause a lot of contention for ownership when all processors are trying to claim ownership of same partitions but one of them will win

Option 2

Make the load balancing loop interval configurable. This will let the users change the interval allowing them to control how quickly partitions are claimed.

Pros
  • When load is unbalanced, shorter interval will help reach balanced state sooner
  • User controls the frequency of load balancing loop
Cons
  • When load is balanced, shorter interval will lead to more frequent calls to checkpoint store with no work actually being done
  • Takes a few iterations before reaching balanced state

Option 3

Have two different intervals for load balancing loop. A shorter interval when the state is unbalanced and the processor needs to claim partitions. One interval is when the processor claimed/stole a partition, the next loop should run sooner until the processor client has no more partitions to claim. When the processor has claimed its fair share of partitions, it will switch back to a longer interval.

Pros
  • When load is unbalanced, the shorter interval will help reach balanced state sooner
  • When load is balanced, the longer interval will be used so, the checkpoint store is accessed less frequently
  • Still allows for nodes to converge to a balanced state without partition ownership jumping around among the active processors
Cons
  • Takes a few iterations before reaching balanced state

Option 4

For each load balancing loop, instead of claiming a single partition, allow claiming at most x partitions at once. This approach address some of the cons of Option 1

Pros
  • This lets the processor claim partitions in bulk and still limit to a smaller number to avoid tripping the processor over
  • Minimizes the number of ownership transfer that comes with Option 1
Cons
  • Slightly complicates the implementation
  • Can increase the chances of contention with other processors

Preferred solution

The preferred solution is a combination of options 1 and 2. Users will have the option to configure the load balancing interval as well as the strategy used to claim partitions in a single load balancing execution. Users will be provided with 2 different strategies to pick from.

  1. Balanced - current implementation where each execution claims at most one new partition
  2. Greedy - each execution claims its fair share of partitions all in a single load balance

API extensions

class EventProcessorClientBuilder() {
   public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration interval);
   public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration interval);
   public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancerStrategy strategy);
   // other existing builder options
}

enum LoadBalancerStrategy {
  BALANCED,
  GREEDY;
}

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

public void eventProcessor(CheckpointStore checkpointStore) {
    Consumer<ProcessorEvent> eventHandler = event -> {
        System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
        System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
    };

    Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
        System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
        System.out.println("Error message = " + errorContext.getThrowable().getMessage());
    };

    // Create an Event Processor instance that runs load balancing task every 30 seconds 
    // and uses default load balancing strategy (BALANCED).
    EventProcessorClient processorClient = new EventProcessorClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .checkpointStore(checkpointStore)
        .processEvent(eventHandler)
        .processError(errorHandler)
        .loadBalancingUpdateInterval(Duration.ofSeconds(30))
        .partitionOwnershipExpirationInterval(Duration.ofMinutes(5))
        .buildEventProcessorClient();

    processorClient.start();
    sleep(10, TimeUnit.SECONDS);
    processorClient.stop();
}

Greedy strategy with default update interval and ownership expiration duration

public void eventProcessor(CheckpointStore checkpointStore) {
    Consumer<ProcessorEvent> eventHandler = event -> {
        System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
        System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
    };

    Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
        System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
        System.out.println("Error message = " + errorContext.getThrowable().getMessage());
    };

    // Create an Event Processor instance that uses GREEDY load balancing strategy.
    EventProcessorClient processorClient = new EventProcessorClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .checkpointStore(checkpointStore)
        .processEvent(eventHandler)
        .processError(errorHandler)
        .loadBalancingStrategy(LoadBalancingStrategy.GREEDY)
        .buildEventProcessorClient();

    processorClient.start();
    sleep(10, TimeUnit.SECONDS);
    processorClient.stop();
}
@chradek
Copy link

chradek commented Apr 21, 2020

For the following under Preferred solution:

Greedy - each execution claims its fair share of partitions all at once

I take this to mean that if a processor starts up and thinks it is the only processor, it will grab every partition. If there's already one processor running, it will only grab half the partitions...is that right?

Also, what are your thoughts on making the LoadBalancerStrategy an interface so customers can write their own, or use one of the ones we provide? In JavaScript we sort of have this today:
https://github.com/Azure/azure-sdk-for-js/blob/b70b6abb2774529329ac1e14f005b651708577e0/sdk/eventhub/event-hubs/src/partitionLoadBalancer.ts#L13-L26

We have 2 classes that implement that interface, though neither the implementations nor the interface are exposed to user's today.

@srnagar
Copy link
Author

srnagar commented Apr 21, 2020

I take this to mean that if a processor starts up and thinks it is the only processor, it will grab every partition. If there's already one processor running, it will only grab half the partitions...is that right?

Yes, that is right!

Also, what are your thoughts on making the LoadBalancerStrategy an interface so customers can write their own, or use one of the ones we provide? In JavaScript we sort of have this today:

That's an option but so far we have only considered opinionated load balancing. If we want to make this like the checkpoint store where users can bring their own implementation, we need to discuss this option.

@chradek
Copy link

chradek commented Apr 21, 2020

What if for JavaScript, we let the user pass in one of the strategies we defined, rather than an interface?

interface EventHubConsumerClientOptions {
  loadBalancingIntervalInSeconds?: number;
  loadBalancingStrategy?: ConservativeLoadBalancer | GreedyLoadBalancer | AggressiveLoadBalancer
}

This would force the user to use one of the strategies we define, but makes it really easy to open up in the future (we just change the type to the interface they all extend instead of the specific classes).

We can go the enum route (well, string union in JavaScript's case), but it makes it harder to support a BYO-strategy in the future if we want to.

Also, the expectation is that these are passed as client-level options right, and not on the subscribe/receive call? Basically, say they are only relevant if the checkpointstore is also provided?

@YijunXieMS
Copy link

YijunXieMS commented Apr 22, 2020

After discussion with Srikanta, I would suggest two changes to the API and internal implementation.

  1. In aggressive and greedy strategy, if multiple event processors are started in short time, a partition might be floating multiple times from one event processor to another. To mitigate this side effect, an event processor only aggressively or greedily claim available(unclaimed, expired) partitions. They still steal one partition each time. So all partitions will start processing in short time. There isn't too much rebalancing. But it takes longer to balance partitions among all event processors.
  2. The greedy strategy is a special case of aggressive when the percentage is 100%. We may allow the users to choose any number or percentage they want. Then we don't need these strategies and users have more control over it. If a user plans to use 8 event processors for 32 partitions, he/she can set the number to 4 or 12.5%. If another user plans 4 event processors, it may be better for him/her to set the number to 8 or 25%. A fixed percentage 20% might not be optimal.

@jsquire
Copy link

jsquire commented May 1, 2020

Summary

I agree with the high-level feature set that seems to be the majority vote, though I have a few different thoughts on the terminology and potential meaning.

Allow the load balancing interval to be controlled via options

I think this would be best as two distinct values:

  • Load Balancing Interval: The amount of time between load balancing cycles.
  • Partition Ownership Length: The amount of time to consider each partition as owned.

Accept a strategy that influences the behavior used by the processor for load balancing

I'd suggest the following, based on a lot of iteration, feedback, and testing of similar terms for other similar "strategy" types:

  • Balanced (default) : The current approach, named to be the naturally perceived "middle of the road" value.. Claim 1 each cycle, wait for the interval to elapse.
  • Eager: (you call this aggressive) Claim 1 immediately start the next cycle, repeating up to 25% of the current partitions (where 25% is a POOMA value)
  • Greedy: (you call this greedy) Claim 1, immediately start the next cycle, repeating up to the max number of partitions or when all are marked as owned

The interesting question is whether a cycle that fails to claim the partition satisfies the iterations for eager and greedy, or we don't count that iteration as complete until the claim is successful, no matter how many load balancing cycles it takes.

For .NET, I'm picturing it as something like:

public enum LoadBalancingStrategy
{
    Balanced,
    Eager,
    Greedy 
}

public class EventProcessorOptions
{
    public TimeSpan LoadBalancingUpdateInterval {get; set; }
    public TimeSpan PartitionOwnershipExpirationInterval { get; set; }
    public LoadBalancingStrategy LoadBalancingStrategy { get; set; } = LoadBalancingStrategy.Balanced;
    
    // Other things here....
    public EventHubConnectionOptions ConnectionOptions { get; set; }
    public EventHubsRetryOptions RetryOptions { get; set; }
    public TimeSpan? MaximumWaitTime { get; set; }
    public int PrefetchCount { get; set; }
    public string Identifier { get; set; }
    public bool TrackLastEnqueuedEventProperties { get; set; } = true;
    public EventPosition DefaultStartingPosition { get; set; } = EventPosition.Earliest;
}

The names would likely be tweaked based on conversations with the architects, but I think the structure holds.

@ramya-rao-a
Copy link

Below are the options configurable in Track 1 EPH (atleast JS). I want to ensure that we have understood what these are before finalizing on what knobs we want to have Track 2

  /**
   * @property {number} [leaseRenewInterval] The sleep interval **`in seconds`** between scans.
   * Default: **`10` seconds**.
   *
   * Allows a lease manager implementation to specify to PartitionManager how often it should
   * scan leases and renew them. In order to redistribute leases in a timely fashion after a host
   * ceases operating, we recommend a relatively short interval, such as ten seconds. Obviously it
   * should be less than half of the lease length, to prevent accidental expiration.
   *
   * If `Leasemanager` is provided when creating the EventProcessorHost then this value will be ignored.
   */
  leaseRenewInterval?: number;
  /**
   * @property {number} [leaseDuration] Duration of a lease **`in seconds`** before it expires
   * unless renewed. Default: **`30` seconds**, Min Value: **`15` seconds**,
   * Max value: **`60` seconds**.
   *
   * If `Leasemanager` is provided when creating the EventProcessorHost then this value will be ignored.
   */
  leaseDuration?: number;
  /**
   * @property {number} [startupScanDelay] The delay time **`in seconds`** between the first scan
   * for available partitions and the second. This is part of a startup optimization which allows
   * individual hosts to become visible to other hosts, and thereby get a more accurate count
   * of the number of hosts in the system, before they try to estimate how many partitions they
   * should own. Default: **`30` seconds**.
   */
  startupScanDelay?: number;
  /**
   * @property {number} [fastScanInterval] There are two possible interval times between scans for
   * available partitions, fast and slow. The fast (short) interval **`in seconds`** is used after
   * a scan in which lease stealing has occurred, to promote quicker rebalancing.
   * Default: **`3` seconds**.
   */
  fastScanInterval?: number;
  /**
   * @property {number} [slowScanInterval] The slow (long) interval **`in seconds`** is used
   * after a scan in which lease stealing did not occur, to reduce unnecessary scanning when
   * the system is in steady state. Default: **`5` seconds**.
   */
  slowScanInterval?: number;

@jsquire
Copy link

jsquire commented May 15, 2020

I've been mulling this over a bit and I find myself agreeing with the thought that surfaced that having a strategy-type enumeration makes it a challenge to potentially allow for more fine-grained tuning in the future, because the naming becomes really tricky.

The more that I think using the Partition Claim Percent concept that Srikanta, Yijun, and others were proposing makes sense. The semantics for it aren't super-explicit, which gives us a bit of flexibility for how we implement and how we evolve in the future while still allowing callers to be more expressive than the enumeration and give us a magnitude for "how aggressive I want to be."

Taking that into account, I'm thinking that it may make sense to start off with accepting the percent with the semantics of "XX percent of the number of partitions that I am expected to own" (not the total number of partitions in a system) and use the ceiling for the percent calculation for the number of "credits" that are used to skip the delay interval and immediately execute another cycle.

For example, let's say that my processor is working in a space where it's fair share is 6 partitions.

  • When the percentage is between 0 and 16% : the value is 1, indicating that the current implementation is used. Claim 1, wait for the interval to elapse.
  • When the percentage is between 17 and 33%: the value is 2. Claim 1, do not wait and immediately run the next load balancing cycle. After two cycles, return to the normal 1-then-wait pattern.
  • When the percentage is between 34 and 50% : the value is 3. Claim 1, do not wait and immediately run the next cycle without delay for the first 3 attempts, then return to the normal pattern.

... and so on.

For .NET, I'm picturing it as something like:

public class EventProcessorOptions
{
    public TimeSpan LoadBalancingUpdateInterval {get; set; }
    public TimeSpan PartitionOwnershipExpirationInterval { get; set; }
    public int PartitionClaimPercent { get; set; } = 25;
    
    // Other things here....
    public EventHubConnectionOptions ConnectionOptions { get; set; }
    public EventHubsRetryOptions RetryOptions { get; set; }
    public TimeSpan? MaximumWaitTime { get; set; }
    public int PrefetchCount { get; set; }
    public string Identifier { get; set; }
    public bool TrackLastEnqueuedEventProperties { get; set; } = true;
    public EventPosition DefaultStartingPosition { get; set; } = EventPosition.Earliest;
}

That all said, I'm not a fan of the name PartitionClaimPercent but I haven't been able to think of a better one. I'm hoping that someone has some awesome inspiration there...

@chradek
Copy link

chradek commented May 28, 2020

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

const client = new EventHubConsumerClient(
  "consumerGroup",
  "connectionString",
  checkpointStore,
  {
    loadBalancingOptions: {
      strategy: "balanced",
      updateIntervalInMs: 30000,
      partitionOwnershipExpirationIntervalInMs: 300000
    }
  }
);

client.subscribe({
  processEvents(events, context) {
    // Your code to process list of events
  },
  processError(error, context) {
    // Your code to handle the error
  }
});

Greedy strategy with default update interval and ownership expiration duration

const client = new EventHubConsumerClient(
  "consumerGroup",
  "connectionString",
  checkpointStore,
  {
    loadBalancingOptions: {
      strategy: "greedy"
    }
  }
);

client.subscribe({
  processEvents(events, context) {
    // Your code to process list of events
  },
  processError(error, context) {
    // Your code to handle the error
  }
});

@YijunXieMS
Copy link

YijunXieMS commented May 28, 2020

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

def on_event(partition_context, event):
    # Put your code here.

def on_error(partition_context, error):
    # put your code here

consumer_client = EventHubConsumerClient.from_connection_string("CONNECTION_STR", "CONSUMER_GROUP", 
    eventhub_name=EVENTHUB_NAME,
    load_balancing_strategy=LoadBalancingStrategy.BALANCED,
    load_balancing_interval=10,
    partition_ownership_expiration_interval=60
)

with consumer_client:
    consumer_client.receive(
        on_event=on_event,
        on_error=on_error,
    )

Greedy strategy with default update interval and ownership expiration duration

def on_event(partition_context, event):
    # Put your code here.

def on_error(partition_context, error):
    # put your code here

consumer_client = EventHubConsumerClient.from_connection_string("CONNECTION_STR", "CONSUMER_GROUP", 
    eventhub_name=EVENTHUB_NAME,
    load_balancing_strategy=LoadBalancingStrategy.GREEDY,
)

with consumer_client:
    consumer_client.receive(
        on_event=on_event,
        on_error=on_error,
    )

@jsquire
Copy link

jsquire commented May 28, 2020

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

var connectionString = "<< EVENT HUB CONNECTION STRING FROM PORTAL >>";
var consumerGroup = "$DEFAULT";
var checkpointStore = new BlobContainerClient(...);

var options = new EventProcessorClientOptions
{
    LoadBalancingStrategy  = LoadBalancingStrategy .Balanced,
    PartitionOwnershipExpirationInterval  = TimeSpan.FromMinutes(3),
    LoadBalancingUpdateInterval = TimeSpan.FromSeconds(5)
};

var processor = new EventProcessorClient(checkpointStore, consumerGroup, connectionString, options);
processor .ProcessEventAsync = async (processingEventArgs) => ...
processor .ProcessErrorAsync = async (processingErrorEventArgs) => ...

await processorClient.StartProcessingEventsAsync();

Greedy strategy with default load balancing intervals

var connectionString = "<< EVENT HUB CONNECTION STRING FROM PORTAL >>";
var consumerGroup = "$DEFAULT";
var checkpointStore = new BlobContainerClient(...);

var options = new EventProcessorClientOptions
{
    LoadBalancingStrategy  = LoadBalancingStrategy.Greedy
};

var processor = new EventProcessorClient(checkpointStore, consumerGroup, connectionString, options);
processor .ProcessEventAsync = async (processingEventArgs) => ...
processor .ProcessErrorAsync = async (processingErrorEventArgs) => ...

await processorClient.StartProcessingEventsAsync();

Balanced strategy with default load balancing intervals

var connectionString = "<< EVENT HUB CONNECTION STRING FROM PORTAL >>";
var consumerGroup = "$DEFAULT";
var checkpointStore = new BlobContainerClient(...);

var processor = new EventProcessorClient(checkpointStore, consumerGroup, connectionString,);
processor .ProcessEventAsync = async (processingEventArgs) => ...
processor .ProcessErrorAsync = async (processingErrorEventArgs) => ...

await processorClient.StartProcessingEventsAsync();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment