Skip to content

Instantly share code, notes, and snippets.

@srnagar
Last active October 22, 2019 04:57
Show Gist options
  • Save srnagar/a81ec29f4a742952d0875036d1d4732d to your computer and use it in GitHub Desktop.
Save srnagar/a81ec29f4a742952d0875036d1d4732d to your computer and use it in GitHub Desktop.
Separate ownership and checkpoint stores

Proposal to have separate partition ownership and checkpoint model class

Currently, PartitionOwnership class contains both ownership and checkpoint information.

Advantages

  • Decouples checkpointing and load balancing
  • Allows users to opt-in to either checkpointing or load balancing or both
  • Each of them can evolve separately without breaking functionality of the other - for e.g. adding new information to checkpointing (custom checkpoints the service team talked about) doesn't impact load balancing (ownership) APIs.
  • Checkpoints are customer data while partition ownership is event processor data. Customer may want to see/analyse checkpoint information and partition ownership data can be hidden from the customer.
  • Less data transmission from store to event processor: load balancing runs frequently and each run will fetch checkpoint (sequence number and offset) information for all partitions for every instance of event processor. Once the event processor cluster reaches a stable state (which is going to be majority of the time), checkpoint information is not needed at all when load balancer runs. The only time checkpoint information is required is when a new partition is claimed by the event processor which happens very infrequently and each event processor can read checkpoints for partitions they own further reducing the data transmitted from the store.
  • Allows for different stores to be used for different purposes. For e.g checkpoints can use blobs and ownership can use Redis. Blobs are more durable and keeping checkpoints in blobs is more reliable while ownership data can be stored in Redis which is less durable but losing ownership data is not going to impact the Event Processor or customer application a great deal as it can recover from it very easily.
  • Updating checkpoints with certain types of store (like Blob) will be more efficient as the conditional update (to check eTag) is not required.

Changes to PartitionManager (CheckpointStore)

public class PartitionOwnership {
  String fullyQualifiedNamespace;
  String eventHubName;
  String consumerGroup;
  String partitionId;
  String ownerId;
  String eTag;
  OffsetDateTime lastModifiedTime;
}

public class Checkpoint {
  String fullyQualifiedNamespace;
  String eventHubName;
  String consumerGroup;
  String partitionId;
  Long offset;
  Long sequenceNumber;
}

// names of the following interfaces may change
interface PartitionManager extends CheckpointManager, OwnershipManager { 
}

interface CheckpointManager {
  Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String eventHubName, String consumerGroup);
  Mono<Void> updateCheckpoint(Checkpoint checkpoint);
}

interface OwnershipManager {
  Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, String eventHubName, String consumerGroup);
  Flux<PartitionOwnership> claimOwnership(List<PartitionOnwnership> requestedPartitionOwnership);
}

Changes to Load Balancing

  • The load balancing algorithm remains the same until new partitions are claimed. Once the ownership of a partition is claimed successfully by the Event Processor, before starting the processing for that partition, call to listCheckpoints is made to get the latest checkpoints.
  • The returned list of checkpoints will be used to find the initial event position for the newly claimed partition.
  • Updating checkpoint will now not have to use eTag.

Changes to Blob checkpoint store

  • Separate blobs for ownership and checkpoint data.
  • Ownership blobs will be in namespace/eventhubname/consumergroup/ownership/partition-id
  • Checkpoint blobs will be in namespace/eventhubname/consumergroup/checkpoint/partition-id
  • The ownership and checkpoint data will be stored as blob metadata which will allow for listBlobs call to fetch all required data in a single request.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment