Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active May 28, 2020 18:32
Show Gist options
  • Save srnagar/366cb66279406db738f8eff82fbfb851 to your computer and use it in GitHub Desktop.
Save srnagar/366cb66279406db738f8eff82fbfb851 to your computer and use it in GitHub Desktop.
Options to claim partitions faster in Event Processor

Problem Statement

In track 2 SDK for Event Processor client, the load balancing loop runs at a fixed interval of 10 seconds. When the load is not balanced among active processors, partitions are claimed/stolen one at a time per load balancing loop. This can take a long time for the processors to reach a balanced state. This document contains a few alternate options to speed up the partition-claiming to reach balanced state sooner.

You can jump to the proposed API section if you want to skip all the options considered.

Options

I have listed a few options below and we could potentially combine two or more of them and also make these configurable load balancing strategies.

Option 1

In each load balancing loop, a processor will try to claim its fair share of partitions all at once.

Pros
  • Theoretically, each individual processor will reach its stable state in just one iteration of load balancing loop
  • Processing for all partitions starts immediately
Cons
  • Partition ownership can jump from one processor to another until all processors come online and reach a steady state
  • If the first node that comes online claims all partitions and starts processing them, it can be overwhelmed by the load and go down immediately
  • Can cause a lot of contention for ownership when all processors are trying to claim ownership of same partitions but one of them will win

Option 2

Make the load balancing loop interval configurable. This will let the users change the interval allowing them to control how quickly partitions are claimed.

Pros
  • When load is unbalanced, shorter interval will help reach balanced state sooner
  • User controls the frequency of load balancing loop
Cons
  • When load is balanced, shorter interval will lead to more frequent calls to checkpoint store with no work actually being done
  • Takes a few iterations before reaching balanced state

Option 3

Have two different intervals for load balancing loop. A shorter interval when the state is unbalanced and the processor needs to claim partitions. One interval is when the processor claimed/stole a partition, the next loop should run sooner until the processor client has no more partitions to claim. When the processor has claimed its fair share of partitions, it will switch back to a longer interval.

Pros
  • When load is unbalanced, the shorter interval will help reach balanced state sooner
  • When load is balanced, the longer interval will be used so, the checkpoint store is accessed less frequently
  • Still allows for nodes to converge to a balanced state without partition ownership jumping around among the active processors
Cons
  • Takes a few iterations before reaching balanced state

Option 4

For each load balancing loop, instead of claiming a single partition, allow claiming at most x partitions at once. This approach address some of the cons of Option 1

Pros
  • This lets the processor claim partitions in bulk and still limit to a smaller number to avoid tripping the processor over
  • Minimizes the number of ownership transfer that comes with Option 1
Cons
  • Slightly complicates the implementation
  • Can increase the chances of contention with other processors

Preferred solution

The preferred solution is a combination of options 1 and 2. Users will have the option to configure the load balancing interval as well as the strategy used to claim partitions in a single load balancing execution. Users will be provided with 2 different strategies to pick from.

  1. Balanced - current implementation where each execution claims at most one new partition
  2. Greedy - each execution claims its fair share of partitions all in a single load balance

API extensions

class EventProcessorClientBuilder() {
   public EventProcessorClientBuilder loadBalancingUpdateInterval(Duration interval);
   public EventProcessorClientBuilder partitionOwnershipExpirationInterval(Duration interval);
   public EventProcessorClientBuilder loadBalancingStrategy(LoadBalancerStrategy strategy);
   // other existing builder options
}

enum LoadBalancerStrategy {
  BALANCED,
  GREEDY;
}

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

public void eventProcessor(CheckpointStore checkpointStore) {
    Consumer<ProcessorEvent> eventHandler = event -> {
        System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
        System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
    };

    Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
        System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
        System.out.println("Error message = " + errorContext.getThrowable().getMessage());
    };

    // Create an Event Processor instance that runs load balancing task every 30 seconds 
    // and uses default load balancing strategy (BALANCED).
    EventProcessorClient processorClient = new EventProcessorClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .checkpointStore(checkpointStore)
        .processEvent(eventHandler)
        .processError(errorHandler)
        .loadBalancingUpdateInterval(Duration.ofSeconds(30))
        .partitionOwnershipExpirationInterval(Duration.ofMinutes(5))
        .buildEventProcessorClient();

    processorClient.start();
    sleep(10, TimeUnit.SECONDS);
    processorClient.stop();
}

Greedy strategy with default update interval and ownership expiration duration

public void eventProcessor(CheckpointStore checkpointStore) {
    Consumer<ProcessorEvent> eventHandler = event -> {
        System.out.println("Sequence number = " + event.getEventData().getSequenceNumber());
        System.out.println("Partition id = " + event.getPartitionContext().getPartitionId());
    };

    Consumer<ProcessorErrorContext> errorHandler = errorContext -> {
        System.out.println("Partition id = " + errorContext.getPartitionContext().getPartitionId());
        System.out.println("Error message = " + errorContext.getThrowable().getMessage());
    };

    // Create an Event Processor instance that uses GREEDY load balancing strategy.
    EventProcessorClient processorClient = new EventProcessorClientBuilder()
        .connectionString("connection-string")
        .consumerGroup("consumer-group")
        .checkpointStore(checkpointStore)
        .processEvent(eventHandler)
        .processError(errorHandler)
        .loadBalancingStrategy(LoadBalancingStrategy.GREEDY)
        .buildEventProcessorClient();

    processorClient.start();
    sleep(10, TimeUnit.SECONDS);
    processorClient.stop();
}
@YijunXieMS
Copy link

YijunXieMS commented May 28, 2020

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

def on_event(partition_context, event):
    # Put your code here.

def on_error(partition_context, error):
    # put your code here

consumer_client = EventHubConsumerClient.from_connection_string("CONNECTION_STR", "CONSUMER_GROUP", 
    eventhub_name=EVENTHUB_NAME,
    load_balancing_strategy=LoadBalancingStrategy.BALANCED,
    load_balancing_interval=10,
    partition_ownership_expiration_interval=60
)

with consumer_client:
    consumer_client.receive(
        on_event=on_event,
        on_error=on_error,
    )

Greedy strategy with default update interval and ownership expiration duration

def on_event(partition_context, event):
    # Put your code here.

def on_error(partition_context, error):
    # put your code here

consumer_client = EventHubConsumerClient.from_connection_string("CONNECTION_STR", "CONSUMER_GROUP", 
    eventhub_name=EVENTHUB_NAME,
    load_balancing_strategy=LoadBalancingStrategy.GREEDY,
)

with consumer_client:
    consumer_client.receive(
        on_event=on_event,
        on_error=on_error,
    )

@jsquire
Copy link

jsquire commented May 28, 2020

Sample user code

Balanced strategy with customized update intervals and ownership expiration duration

var connectionString = "<< EVENT HUB CONNECTION STRING FROM PORTAL >>";
var consumerGroup = "$DEFAULT";
var checkpointStore = new BlobContainerClient(...);

var options = new EventProcessorClientOptions
{
    LoadBalancingStrategy  = LoadBalancingStrategy .Balanced,
    PartitionOwnershipExpirationInterval  = TimeSpan.FromMinutes(3),
    LoadBalancingUpdateInterval = TimeSpan.FromSeconds(5)
};

var processor = new EventProcessorClient(checkpointStore, consumerGroup, connectionString, options);
processor .ProcessEventAsync = async (processingEventArgs) => ...
processor .ProcessErrorAsync = async (processingErrorEventArgs) => ...

await processorClient.StartProcessingEventsAsync();

Greedy strategy with default load balancing intervals

var connectionString = "<< EVENT HUB CONNECTION STRING FROM PORTAL >>";
var consumerGroup = "$DEFAULT";
var checkpointStore = new BlobContainerClient(...);

var options = new EventProcessorClientOptions
{
    LoadBalancingStrategy  = LoadBalancingStrategy.Greedy
};

var processor = new EventProcessorClient(checkpointStore, consumerGroup, connectionString, options);
processor .ProcessEventAsync = async (processingEventArgs) => ...
processor .ProcessErrorAsync = async (processingErrorEventArgs) => ...

await processorClient.StartProcessingEventsAsync();

Balanced strategy with default load balancing intervals

var connectionString = "<< EVENT HUB CONNECTION STRING FROM PORTAL >>";
var consumerGroup = "$DEFAULT";
var checkpointStore = new BlobContainerClient(...);

var processor = new EventProcessorClient(checkpointStore, consumerGroup, connectionString,);
processor .ProcessEventAsync = async (processingEventArgs) => ...
processor .ProcessErrorAsync = async (processingErrorEventArgs) => ...

await processorClient.StartProcessingEventsAsync();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment