-
-
Save pauloricardomg/1930c8cf645aa63387a57bb57f79a0f7 to your computer and use it in GitHub Desktop.
Cassandra Cluster Membership Protocol
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def bootstrap(ring, node_id, tokens) | |
# Try updating the ring membership with local vnodes on ADDING state | |
my_vnodes = list(map(lambda t: VirtualNode(t, node_id, Status.ADDING), tokens)) | |
if not ring.try_update_ring(my_vnodes): | |
raise Exception("Cannot bootstrap node {}. Is any node down?".format(node_id)) | |
# Perform streaming of new ranges | |
if stream_ranges(ring.get_ranges(node_id)): | |
success = True | |
print("BOOTSTRAP SUCCESS") | |
my_vnodes = list(map(lambda t: VirtualNode(t, node_id, Status.NORMAL), tokens)) | |
else | |
print("BOOTSTRAP FAILED") | |
success = False | |
my_vnodes = list(map(lambda t: VirtualNode(t, node_id, Status.REMOVED), tokens)) | |
# Try updating the ring membership with local vnodes on NORMAL state (if the operation succeeded), or REMOVED state otherwise | |
if not ring.try_update_ring(my_vnodes): | |
raise Exception("Cannot complete ring membership change for bootstrap operation. Please complete operation manually.") | |
return success |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from bootstrap import bootstrap | |
def incremental_bootstrap(ring, node_id, tokens) | |
success = [] | |
fail = [] | |
# Bootstrap tokens sequentially using the helper bootstrap | |
# method from https://gist.github.com/pauloricardomg/1930c8cf645aa63387a57bb57f79a0f7#file-bootstrap-py | |
for token in tokens: | |
if bootstrap(ring, node_id, [token]) | |
success.append(token) | |
else | |
fail.append(token) | |
if fail: | |
print("Failed bootstrap of tokens: {}. Add these tokens manually via 'ringtool addtoken'.".format(ranges)) | |
return len(fail) == 0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def move(ring, node_id, from_token, to_token) | |
from_vnode = ring.get_vnode(from_token) | |
if (from_vnode.owner != node_id) | |
raise Exception("Cannot move from token {} because I'm not the owner of this token.".format(from_token)) | |
if (ring.get_vnode(to_token) is not None) | |
raise Exception("Cannot move to token {} this node already has an owner.".format(to_token)) | |
# create ring delta for move operation | |
from_vnode = from_vnode.with_status(Status.MOVING_FROM) | |
to_vnode = VirtualNode(to_token, node_id, Status.MOVING_TO) | |
move_operation = [from_vnode, to_vnode] | |
# Try updating the ring with the move operation | |
if not ring.try_update_ring(move_operation): | |
raise Exception("Cannot move token {} to {}. Is any node down?".format(from_token, to_token)) | |
# Perform streaming inbound and outbound ranges | |
if stream_ranges(outbound_stream=ring.get_range(from_vnode), | |
inbound_stream=ring.get_range(to_vnode)): | |
success = True | |
print("MOVE SUCCESS") | |
move_operation = [from_vnode.with_status(Status.REMOVED), to_vnode.with_status(Status.NORMAL)] | |
else | |
print("MOVE FAILED") | |
success = False | |
move_operation = [from_vnode.with_status(Status.NORMAL), to_vnode.with_status(Status.REMOVED)] | |
# Update FROM and TO vnodes with new status | |
if not ring.try_update_ring(move_operation): | |
raise Exception("Cannot complete ring membership change for move operation. Please complete operation manually.") | |
return success |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def remove_node(ring, node_id, node_to_remove_id) | |
# Get vnodes of node to replace and change owner ID to this node ID | |
vnodes_to_remove = ring.get_vnodes(node_to_remove_id) | |
vnodes_to_remove = list(map(lambda v: v.with_status(Status.REMOVING), vnodes_to_remove)) | |
# Try updating the ring membership with local vnodes on ADDING state | |
if not ring.try_update_ring(vnodes_to_remove): # TODO: need to remove node_to_remove from cohort | |
raise Exception("Cannot replace node {}. Is any node down?".format(node_to_remove_id)) | |
# Coordinate stream of ranges between new owners | |
if coordinate_stream_of_ranges(ring.get_ranges(node_to_remove_id)): | |
success = True | |
print("REMOVE SUCCESS") | |
vnodes_to_remove = list(map(lambda v: v.with_status(Status.REMOVED), vnodes_to_remove)) | |
else | |
print("REMOVE FAILED") | |
success = False | |
vnodes_to_remove = list(map(lambda v: v.with_status(Status.REMOVE_FAILED), vnodes_to_remove)) | |
# Try updating the ring membership with local vnodes on NORMAL state (if the operation succeeded), or REPLACE_FAILED state otherwise | |
if not ring.try_update_ring(vnodes_to_remove): | |
raise Exception("Cannot complete ring membership change for remove operation. Please complete operation manually.") | |
return success |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def remove_token(ring, node_id, token) | |
vnode_to_remove = ring.get_vnode(from_token) | |
if (vnode_to_remove.owner != node_id) | |
raise Exception("Cannot remove token {} because I'm not the owner of this token.".format(from_token)) | |
# Try updating the ring membership with vnode_to_remove on REMOVING state | |
vnode_to_remove = from_vnode.with_status(Status.REMOVING) | |
if not ring.try_update_ring(my_vnodes): | |
raise Exception("Cannot remove token {}. Is any node down?".format(token)) | |
# Perform streaming of new ranges | |
if stream_ranges(outbound_stream=ring.get_range(vnode_to_remove)): | |
success = True | |
print("REMOVE_TOKEN SUCCESS") | |
vnode_to_remove = vnode_to_remove.with_status(Status.REMOVED) | |
else | |
print("REMOVE_TOKEN FAILED") | |
success = False | |
vnode_to_remove = vnode_to_remove.with_status(Status.NORMAL) | |
# Update virtual node with final status | |
if not ring.try_update_ring(vnode_to_remove): | |
raise Exception("Cannot complete ring membership change for removetoken operation. Please complete operation manually.") | |
return success |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def replace(ring, node_id, node_to_replace_id) | |
# Get vnodes of node to replace and change owner ID to this node ID | |
vnodes_to_replace = ring.get_vnodes(node_to_replace_id) | |
vnodes_to_replace = list(map(lambda v: VirtualNode(v.token, node_id, Status.REPLACING), vnodes_to_replace)) | |
# Try updating the ring membership with local vnodes on REPLACING state | |
if not ring.try_update_ring(vnodes_to_replace): | |
raise Exception("Cannot replace node {}. Is any node down?".format(node_to_replace_id)) | |
# Perform streaming of new ranges | |
if stream_ranges(ring.get_ranges(node_id)): | |
success = True | |
print("REPLACE SUCCESS") | |
vnodes_to_replace = list(map(lambda v: v.with_status(Status.NORMAL), vnodes_to_replace)) | |
else | |
print("REPLACE FAILED") | |
success = False | |
vnodes_to_replace = list(map(lambda v: v.with_status(Status.REPLACE_FAILED), vnodes_to_replace)) | |
# Try updating the ring membership with local vnodes on NORMAL state (if the operation succeeded), or REPLACE_FAILED state otherwise | |
if not ring.try_update_ring(vnodes_to_replace): | |
raise Exception("Cannot complete ring membership change for replace operation. Please complete operation manually.") | |
return success |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SafeRingMembership | |
MAX_NETWORK_PARTITION_SECONDS = 259200 # 3 days | |
GOSSIP_INTERVAL_SECONDS = 1 | |
def __init__(self, gossiper, saved_ring=[]): | |
self.gossiper = gossiper | |
self.promissed_version = Register(value=None) | |
self.cohort = gossiper.peers | |
# Register itself for gossiper broadcast notifications | |
gossiper.register_broadcast_interested(self) | |
# Init local ring | |
local_ring = saved_ring | |
for p in self.peers: | |
local_ring = local_ring.with_applied_delta(p.gossip_state["RING_DELTA"]) | |
self.local_ring = local_ring | |
def try_update_ring(ring_delta): | |
# A: Abort if already promissed ring membership change to someone | |
if self.promissed_version.is_set(): return False | |
# B: Abort if cohort does not meet pre-conditions (all live, same ring version, etc) | |
if not is_valid(self.cohort): return False | |
# C: Promise new ring version to itself | |
ring_candidate = self.local_ring.apply(ring_delta) | |
new_version = ring_candidate.version | |
self.promissed_version.set(new_version, ttl=MAX_NETWORK_PARTITION_SECONDS) | |
# D: Send proposal to all nodes in the cluster | |
acks = [p.remote_propose(self.local_ring.version, new_version) for p in self.cohort] | |
# E: Abort if not all peers promisse to accept the proposal | |
if not all(acks): | |
self.promissed_version.set(None) | |
# TODO: send abort to peers that accepted the proposal | |
return False | |
# F: Commit the proposal by broadcasting the ring delta via gossip | |
self.gossiper.broadcast("RING_DELTA", ring_delta) | |
# G: Wait for all peers to receive the new_version via gossip | |
while True: | |
if all(map(lambda p: p.gossip_state["RING_VERSION"] == new_version, self.cohort)): break | |
sleep(GOSSIP_INTERVAL_SECONDS) | |
# H: Ring membership change completed with success | |
self.promissed_version.set(None) | |
return True | |
def propose(peer_version, proposed_version): | |
# A: Reject proposal if already promissed ring membership change to someone | |
if self.promissed_version.is_set(): return False | |
# B: Reject proposal if peer is not on the same version as this node | |
if local_ring.version != peer_version: return False | |
# C: Accept proposal | |
self.promissed_version.set(proposed_version, ttl=MAX_NETWORK_PARTITION_SECONDS) | |
return True | |
def receive_gossip_broadcast(state, remote_delta): | |
# We're only interested in RING_DELTA state | |
if state != RING_DELTA: return False | |
# We only apply ring delta if promissed ring membership change | |
if not self.promissed_version.is_set(): return False | |
# Do not accept ring delta if doesn't generate promised version | |
ring_candidate = self.local_ring.apply(remote_delta) | |
if ring_candidate.version != promised_version.get(): return False | |
# Commit promissed version and broadcast it via gossip | |
self.local_ring = ring_candidate | |
self.gossiper.broadcast("RING_VERSION", self.local_ring.version) | |
self.promissed_version.set(None) | |
return True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment