In track 2 SDK for Event Processor client, the load balancing loop runs at a fixed interval of 10 seconds. When the load is not balanced among active processors, partitions are claimed/stolen one at a time per load balancing loop. This can take a long time for the processors to reach a balanced state. This document contains a few alternate options to speed up the partition-claiming to reach balanced state sooner.
You can jump to the proposed API section if you want to skip all the options considered.
I have listed a few options below and we could potentially combine two or more of them and also make these configurable load balancing strategies.
In each load balancing loop, a processor will try to claim its fair share of partitions all at once.
- Theoretically, each individual processor will reach its stable state in just one iteration of load balancing loop
- Processing for all partitions starts immediately
- Partition ownership can jump from one processor to another until all processors come online and reach a steady state
- If the first node that comes online claims all partitions and starts processing them, it can be overwhelmed by the load and go down immediately
- Can cause a lot of contention for ownership when all processors are trying to claim ownership of same partitions but one of them will win
Make the load balancing loop interval configurable. This will let the users change the interval allowing them to control how quickly partitions are claimed.
- When load is unbalanced, shorter interval will help reach balanced state sooner
- User controls the frequency of load balancing loop
- When load is balanced, shorter interval will lead to more frequent calls to checkpoint store with no work actually being done
- Takes a few iterations before reaching balanced state
Have two different intervals for load balancing loop. A shorter interval when the state is unbalanced and the processor needs to claim partitions. One interval is when the processor claimed/stole a partition, the next loop should run sooner until the processor client has no more partitions to claim. When the processor has claimed its fair share of partitions, it will switch back to a longer interval.
- When load is unbalanced, the shorter interval will help reach balanced state sooner
- When load is balanced, the longer interval will be used so, the checkpoint store is accessed less frequently
- Still allows for nodes to converge to a balanced state without partition ownership jumping around among the active processors
- Takes a few iterations before reaching balanced state
For each load balancing loop, instead of claiming a single partition, allow claiming at most x partitions at once. This approach address some of the cons of Option 1
- This lets the processor claim partitions in bulk and still limit to a smaller number to avoid tripping the processor over
- Minimizes the number of ownership transfer that comes with Option 1
- Slightly complicates the implementation
- Can increase the chances of contention with other processors
The preferred solution is a combination of options 1 and 2. Users will have the option to configure the load balancing interval as well as the strategy used to claim partitions in a single load balancing execution. Users will be provided with 2 different strategies to pick from.
- Balanced - current implementation where each execution claims at most one new partition
- Greedy - each execution claims its fair share of partitions all in a single load balance
class EventProcessorClientBuilder() {
public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration interval);
public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration interval);
public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancerStrategy strategy);
// other existing builder options
}
enum LoadBalancerStrategy {
BALANCED,
GREEDY;
}
public void eventProcessor(CheckpointStore checkpointStore) {
Consumer<ProcessorEvent> eventHandler = event -> {
System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
};
Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
System.out.println("Error message = " + errorContext.getThrowable().getMessage());
};
// Create an Event Processor instance that runs load balancing task every 30 seconds
// and uses default load balancing strategy (BALANCED).
EventProcessorClient processorClient = new EventProcessorClientBuilder()
.connectionString("connection-string")
.consumerGroup("consumer-group")
.checkpointStore(checkpointStore)
.processEvent(eventHandler)
.processError(errorHandler)
.loadBalancingUpdateInterval(Duration.ofSeconds(30))
.partitionOwnershipExpirationInterval(Duration.ofMinutes(5))
.buildEventProcessorClient();
processorClient.start();
sleep(10, TimeUnit.SECONDS);
processorClient.stop();
}
public void eventProcessor(CheckpointStore checkpointStore) {
Consumer<ProcessorEvent> eventHandler = event -> {
System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
};
Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
System.out.println("Error message = " + errorContext.getThrowable().getMessage());
};
// Create an Event Processor instance that uses GREEDY load balancing strategy.
EventProcessorClient processorClient = new EventProcessorClientBuilder()
.connectionString("connection-string")
.consumerGroup("consumer-group")
.checkpointStore(checkpointStore)
.processEvent(eventHandler)
.processError(errorHandler)
.loadBalancingStrategy(LoadBalancingStrategy.GREEDY)
.buildEventProcessorClient();
processorClient.start();
sleep(10, TimeUnit.SECONDS);
processorClient.stop();
}
I've been mulling this over a bit and I find myself agreeing with the thought that surfaced that having a strategy-type enumeration makes it a challenge to potentially allow for more fine-grained tuning in the future, because the naming becomes really tricky.
The more that I think using the
Partition Claim Percent
concept that Srikanta, Yijun, and others were proposing makes sense. The semantics for it aren't super-explicit, which gives us a bit of flexibility for how we implement and how we evolve in the future while still allowing callers to be more expressive than the enumeration and give us a magnitude for "how aggressive I want to be."Taking that into account, I'm thinking that it may make sense to start off with accepting the percent with the semantics of "XX percent of the number of partitions that I am expected to own" (not the total number of partitions in a system) and use the ceiling for the percent calculation for the number of "credits" that are used to skip the delay interval and immediately execute another cycle.
For example, let's say that my processor is working in a space where it's fair share is 6 partitions.
... and so on.
For .NET, I'm picturing it as something like:
That all said, I'm not a fan of the name
PartitionClaimPercent
but I haven't been able to think of a better one. I'm hoping that someone has some awesome inspiration there...