Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@pauloricardomg
Last active September 23, 2020 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pauloricardomg/1930c8cf645aa63387a57bb57f79a0f7 to your computer and use it in GitHub Desktop.
Save pauloricardomg/1930c8cf645aa63387a57bb57f79a0f7 to your computer and use it in GitHub Desktop.
Cassandra Cluster Membership Protocol
def bootstrap(ring, node_id, tokens)
# Try updating the ring membership with local vnodes on ADDING state
my_vnodes = list(map(lambda t: VirtualNode(t, node_id, Status.ADDING), tokens))
if not ring.try_update_ring(my_vnodes):
raise Exception("Cannot bootstrap node {}. Is any node down?".format(node_id))
# Perform streaming of new ranges
if stream_ranges(ring.get_ranges(node_id)):
success = True
print("BOOTSTRAP SUCCESS")
my_vnodes = list(map(lambda t: VirtualNode(t, node_id, Status.NORMAL), tokens))
else
print("BOOTSTRAP FAILED")
success = False
my_vnodes = list(map(lambda t: VirtualNode(t, node_id, Status.REMOVED), tokens))
# Try updating the ring membership with local vnodes on NORMAL state (if the operation succeeded), or REMOVED state otherwise
if not ring.try_update_ring(my_vnodes):
raise Exception("Cannot complete ring membership change for bootstrap operation. Please complete operation manually.")
return success
from bootstrap import bootstrap
def incremental_bootstrap(ring, node_id, tokens)
success = []
fail = []
# Bootstrap tokens sequentially using the helper bootstrap
# method from https://gist.github.com/pauloricardomg/1930c8cf645aa63387a57bb57f79a0f7#file-bootstrap-py
for token in tokens:
if bootstrap(ring, node_id, [token])
success.append(token)
else
fail.append(token)
if fail:
print("Failed bootstrap of tokens: {}. Add these tokens manually via 'ringtool addtoken'.".format(ranges))
return len(fail) == 0
def move(ring, node_id, from_token, to_token)
from_vnode = ring.get_vnode(from_token)
if (from_vnode.owner != node_id)
raise Exception("Cannot move from token {} because I'm not the owner of this token.".format(from_token))
if (ring.get_vnode(to_token) is not None)
raise Exception("Cannot move to token {} this node already has an owner.".format(to_token))
# create ring delta for move operation
from_vnode = from_vnode.with_status(Status.MOVING_FROM)
to_vnode = VirtualNode(to_token, node_id, Status.MOVING_TO)
move_operation = [from_vnode, to_vnode]
# Try updating the ring with the move operation
if not ring.try_update_ring(move_operation):
raise Exception("Cannot move token {} to {}. Is any node down?".format(from_token, to_token))
# Perform streaming inbound and outbound ranges
if stream_ranges(outbound_stream=ring.get_range(from_vnode),
inbound_stream=ring.get_range(to_vnode)):
success = True
print("MOVE SUCCESS")
move_operation = [from_vnode.with_status(Status.REMOVED), to_vnode.with_status(Status.NORMAL)]
else
print("MOVE FAILED")
success = False
move_operation = [from_vnode.with_status(Status.NORMAL), to_vnode.with_status(Status.REMOVED)]
# Update FROM and TO vnodes with new status
if not ring.try_update_ring(move_operation):
raise Exception("Cannot complete ring membership change for move operation. Please complete operation manually.")
return success
def remove_node(ring, node_id, node_to_remove_id)
# Get vnodes of node to replace and change owner ID to this node ID
vnodes_to_remove = ring.get_vnodes(node_to_remove_id)
vnodes_to_remove = list(map(lambda v: v.with_status(Status.REMOVING), vnodes_to_remove))
# Try updating the ring membership with local vnodes on ADDING state
if not ring.try_update_ring(vnodes_to_remove): # TODO: need to remove node_to_remove from cohort
raise Exception("Cannot replace node {}. Is any node down?".format(node_to_remove_id))
# Coordinate stream of ranges between new owners
if coordinate_stream_of_ranges(ring.get_ranges(node_to_remove_id)):
success = True
print("REMOVE SUCCESS")
vnodes_to_remove = list(map(lambda v: v.with_status(Status.REMOVED), vnodes_to_remove))
else
print("REMOVE FAILED")
success = False
vnodes_to_remove = list(map(lambda v: v.with_status(Status.REMOVE_FAILED), vnodes_to_remove))
# Try updating the ring membership with local vnodes on NORMAL state (if the operation succeeded), or REPLACE_FAILED state otherwise
if not ring.try_update_ring(vnodes_to_remove):
raise Exception("Cannot complete ring membership change for remove operation. Please complete operation manually.")
return success
def remove_token(ring, node_id, token)
vnode_to_remove = ring.get_vnode(from_token)
if (vnode_to_remove.owner != node_id)
raise Exception("Cannot remove token {} because I'm not the owner of this token.".format(from_token))
# Try updating the ring membership with vnode_to_remove on REMOVING state
vnode_to_remove = from_vnode.with_status(Status.REMOVING)
if not ring.try_update_ring(my_vnodes):
raise Exception("Cannot remove token {}. Is any node down?".format(token))
# Perform streaming of new ranges
if stream_ranges(outbound_stream=ring.get_range(vnode_to_remove)):
success = True
print("REMOVE_TOKEN SUCCESS")
vnode_to_remove = vnode_to_remove.with_status(Status.REMOVED)
else
print("REMOVE_TOKEN FAILED")
success = False
vnode_to_remove = vnode_to_remove.with_status(Status.NORMAL)
# Update virtual node with final status
if not ring.try_update_ring(vnode_to_remove):
raise Exception("Cannot complete ring membership change for removetoken operation. Please complete operation manually.")
return success
def replace(ring, node_id, node_to_replace_id)
# Get vnodes of node to replace and change owner ID to this node ID
vnodes_to_replace = ring.get_vnodes(node_to_replace_id)
vnodes_to_replace = list(map(lambda v: VirtualNode(v.token, node_id, Status.REPLACING), vnodes_to_replace))
# Try updating the ring membership with local vnodes on REPLACING state
if not ring.try_update_ring(vnodes_to_replace):
raise Exception("Cannot replace node {}. Is any node down?".format(node_to_replace_id))
# Perform streaming of new ranges
if stream_ranges(ring.get_ranges(node_id)):
success = True
print("REPLACE SUCCESS")
vnodes_to_replace = list(map(lambda v: v.with_status(Status.NORMAL), vnodes_to_replace))
else
print("REPLACE FAILED")
success = False
vnodes_to_replace = list(map(lambda v: v.with_status(Status.REPLACE_FAILED), vnodes_to_replace))
# Try updating the ring membership with local vnodes on NORMAL state (if the operation succeeded), or REPLACE_FAILED state otherwise
if not ring.try_update_ring(vnodes_to_replace):
raise Exception("Cannot complete ring membership change for replace operation. Please complete operation manually.")
return success
class SafeRingMembership
MAX_NETWORK_PARTITION_SECONDS = 259200 # 3 days
GOSSIP_INTERVAL_SECONDS = 1
def __init__(self, gossiper, saved_ring=[]):
self.gossiper = gossiper
self.promissed_version = Register(value=None)
self.cohort = gossiper.peers
# Register itself for gossiper broadcast notifications
gossiper.register_broadcast_interested(self)
# Init local ring
local_ring = saved_ring
for p in self.peers:
local_ring = local_ring.with_applied_delta(p.gossip_state["RING_DELTA"])
self.local_ring = local_ring
def try_update_ring(ring_delta):
# A: Abort if already promissed ring membership change to someone
if self.promissed_version.is_set(): return False
# B: Abort if cohort does not meet pre-conditions (all live, same ring version, etc)
if not is_valid(self.cohort): return False
# C: Promise new ring version to itself
ring_candidate = self.local_ring.apply(ring_delta)
new_version = ring_candidate.version
self.promissed_version.set(new_version, ttl=MAX_NETWORK_PARTITION_SECONDS)
# D: Send proposal to all nodes in the cluster
acks = [p.remote_propose(self.local_ring.version, new_version) for p in self.cohort]
# E: Abort if not all peers promisse to accept the proposal
if not all(acks):
self.promissed_version.set(None)
# TODO: send abort to peers that accepted the proposal
return False
# F: Commit the proposal by broadcasting the ring delta via gossip
self.gossiper.broadcast("RING_DELTA", ring_delta)
# G: Wait for all peers to receive the new_version via gossip
while True:
if all(map(lambda p: p.gossip_state["RING_VERSION"] == new_version, self.cohort)): break
sleep(GOSSIP_INTERVAL_SECONDS)
# H: Ring membership change completed with success
self.promissed_version.set(None)
return True
def propose(peer_version, proposed_version):
# A: Reject proposal if already promissed ring membership change to someone
if self.promissed_version.is_set(): return False
# B: Reject proposal if peer is not on the same version as this node
if local_ring.version != peer_version: return False
# C: Accept proposal
self.promissed_version.set(proposed_version, ttl=MAX_NETWORK_PARTITION_SECONDS)
return True
def receive_gossip_broadcast(state, remote_delta):
# We're only interested in RING_DELTA state
if state != RING_DELTA: return False
# We only apply ring delta if promissed ring membership change
if not self.promissed_version.is_set(): return False
# Do not accept ring delta if doesn't generate promised version
ring_candidate = self.local_ring.apply(remote_delta)
if ring_candidate.version != promised_version.get(): return False
# Commit promissed version and broadcast it via gossip
self.local_ring = ring_candidate
self.gossiper.broadcast("RING_VERSION", self.local_ring.version)
self.promissed_version.set(None)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment