Skip to content

Instantly share code, notes, and snippets.

@conniey
Last active May 31, 2024 19:05
Show Gist options
  • Save conniey/f96d64547efa955a9e763cee6b65d1fc to your computer and use it in GitHub Desktop.
Save conniey/f96d64547efa955a9e763cee6b65d1fc to your computer and use it in GitHub Desktop.
Geo-DR Event Hubs

Geodisaster recovery and replication in Event Hubs

Introduction

Event Hubs provides recovery and replication features to ensure that your data is safe and available in the case of a disaster. Data replication to a secondary region introduces some problems because the secondary region will not have the same offsets as the primary region. We cannot use offset because the offset within the secondary region will not match the offset in the primary region. The suggested solution is to use an epoch to denote the number of times the primary region changes.

Terms

  • Sequence number: Long denoting the order in which events were sent to the Event Hub.
  • Replication segment: Integer denoting the replication configuration.
    • In service design documentation, they call this "replication epoch". Choosing replication segment will avoid confusion for the definition of "epoch".
  • Offset: Long denoting the position of an event within a partition of a region, same event will have different offsets once replicated

API Changes

Language API view

The gist

  1. Add Integer getReplicationSegment() to EventData.
  2. Add Integer getLastEnqueuedReplicationSegment() to PartitionProperties
  3. Add Integer getBeginningReplicationSegment() to PartitionProperties
  4. Add Integer getReplicationSegment() to Checkpoint.
  5. Add Integer getReplicationSegment() to LastEnqueuedEventProperties.
    • NOTE: Unsure about this one. Do we fully name it? The class already has the name. Is this populated when geo-DR is enabled?
  6. Add overloads to EventPosition to create a position using epoch.
    • EventPosition fromSequenceNumber(long sequenceNumber, int replicationSegment)
    • EventPosition fromSequenceNumber(long sequenceNumber, int replicationSegment, boolean isInclusive)

Implementation changes

  • Add "com.microsoft:georeplication" to DesiredCapabilities when creating a receive or send link.
    • There is no current scenario regarding the producer, but ZZ mentions it may allow for flexibility.
  • Update receiver filter to include both replication segment and sequence number in the form "amqp.annotation.x-opt-sequence-number" '<ReplicationSegment>:<SequenceNumber>'.
    • Example: Replication segment is 10, sequence number is 1555. The field "amqp.annotation.x-opt-sequence-number" contains 10:1555
  • Extract replication segment to populate PartitionProperties. Keys are begin_sequence_number_epoch and last_enqueued_sequence_number_epoch in the map.
  • Extract replication segment to populate EventData.getReplicationSegment() and SystemProperties. Key is x-opt-sequence-number-epoch.
  • Use both replication segment and sequence number to consume events.
  • When checkpointing, include the replication segment in the checkpoint.
    • Metadata key: "replicationsegment"
  • Prioritise looking for sequence number and replication segment rather than offset in our internal checkpoint logic.
    • IIRC, our current logic checks for the Checkpoint's offset, then sequence number.
  • When trackLastEnqueuedEventProperties is set to true, extract replication segment value from AMQP message annotation section.

Scenarios and expected behaviour

GeoDR enabled namespace

1. Customer with a new client library. Works for both existing/non-existing checkpoints.

Scenario for existing checkpoints:

  1. Customer was using v1.0 of the library to process their events and persisted checkpoints.
  2. Customer upgrades to v1.1 with GeoDR support.
  3. Starts up their app and begins processing the same Event Hub.

Scenario for non-existing checkpoints:

  1. Customer updates to v.1.1 with GeoDR support.
  2. Starts processing events on a new Event Hub.

What the SDK would do:

  • The client sends the replication capability (automatically, no explicit opt-in).
  • A reader requests a starting position with sequence number, but no epoch specified.
    • Client library can either pass in -1 or "" (empty string). Service will assume a default.

2. Customer is using old client library.

  • FYI. No implementation on our end.
    • Old client library does not send the "com.microsoft:georeplication" property.
    • Service will either assume an epoch by default or have an opt-in to fail.

Non-GeoDR enabled namespace

1. Customer is using new client library

  • The client sends the replication capability (automatically, no explicit opt-in)
  • Service recognises it is not GeoDR, and ignores the capability.
  • Works as it normally does:
    • A reader requests a starting position with sequence number, but no epoch specified
    • A reader requests a starting position with offset – the service accepts this as valid
    • A reader requests a starting position with an enqueue time – the service accepts this as valid

2. Customer is using old client library

  • All the same behaviour as existing library.

References

Hero Scenarios

Addition of geo-disaster recovery support is transparent. When customers upgrade to the latest version of the SDK, the SDK checks with the service whether customers have Geo-DR support. If there is no support, customers maintain the same experience.

Getting events

Using EventProcessorClient

Experience is the same. The SDK will update the checkpoints with replicationSegment if that field is present in EventData. Event Hubs service will send back the latest replication segment or -1 if that data is not present in existing checkpoints.

Using lower level client

Addition of replicationSegment overload in EventPosition. If customers know the replicationSegment.

EventHubConsumerClient consumer = getComsumerClient();

long sequenceNumber = 130L;
int replicationSegment = 21;
EventPosition position = EventPosition.fromSequenceNumber(sequenceNumber, replicationSegment);

consumer.receiveFromPartition(partitionId, position, numberOfEvents);

If customers do not know the latest replicationSegment, they can:

  1. Query PartitionProperties for that information.
var partitionProperties = consumer.getPartitionProperties(partitionId);
var replicationSegment = partitionProperties.getLastEnqueuedReplicationSegment();

EventPosition position = EventPosition.fromSequenceNumber(sequenceNumber, replicationSegment);

consumer.receiveFromPartition(partitionId, position, numberOfEvents);
  1. Pass in null or default value and service will calculate the latest replication segment to use.

Additions

Geo-replication Factor

Used to determine whether Geo-DR is enabled on an Event Hubs namespace. When georeplication factor is > 1, it means it has geo-dr enabled. It represents # of instances: primary instance + all secondaries.

  • Extract Integer GeoReplicationFactor from management call to fetch EventHubProperties.
    • Keyed as "georeplication_factor"
    • Does not need to be exposed publically.

Questions

  1. What happens when service gets non-compat cursor (i.e. sequence number or numerical offset)? After switching geo-dr on, what happens when running EventProcessorClient?

    1. What is the customer experience?
      1. Does the SDK assume some default? Does the service tell us what to do instead?
    2. Do they get a specific error message?
    3. Richard was wondering: 'Is it possible to give an old offset/sequence number and have the service translate it to the new offset/sequence number?'
  2. If people want to be able to do event position math (figure out how big their event backlog is) with sequence number, how does that work when a failover happens?

    1. Document the behaviour and possible lag/gap in sequence numbers?
    2. What does the service want to do? (Send back messages that are only replicated already?)
    3. Do we provide an API to give them the backlog or figure out the gap?
  3. If replication segment (x-opt-sequence-number-epoch) is still exposed, what is the use case for it since we don't use it as part of a composite key anymore?

    1. x-opt-offset is a special string that can globally identify an event now. What would the purpose of extracting x-opt-sequence-number-epoch be? (Since I don't need to use it to create an EventPosition anymore.)

Ideas brought up

  • Do we provide an API to give them the backlog or figure out the gap?
  • What happens when consumer turns on geo-dr at runtime?
    1. Do we expose options for the ops team to pick the behaviour if they have existing offsets? At the moment, they just choose region... but we could have them choose to only failover after all athe data's been replicated, etc.
@jsquire
Copy link

jsquire commented Sep 5, 2023

Looks good to me, a couple of small thoughts:

  • I'd like to normalize on Replication Segment as our term for this new epoch overload.
  • We should make the replication segment a long, at least. We probably want to make sure the service team doesn't prefer to treat it as a string, just to be safe.
  • I'd mention that if a replication segment is not known, we should pass an explicit "-1" in the implementation changes section. I had to scan for it in the scenarios, but it's pretty important knowledge.

@jsquire
Copy link

jsquire commented Sep 13, 2023

Question:

  • If we're no longer supporting offset for GeoDR resources, how can we specify "beginning of stream" and "end of stream" starting positions in a neutral way. These are currently special offset values.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment