Instantly share code, notes, and snippets.

Embed
What would you like to do?
Akka Cluster Implementation Notes

Akka Cluster Implementation Notes

Slightly disorganized but reasonably complete notes on the algorithms, strategies and optimizations of the Akka Cluster implementation. Could use a lot more links and context etc., but was just written for my own understanding. Might be expanded later.

Links to papers and talks that have inspired the implementation can be found on the 10 last pages of this presentation.

Akka Gossip

Gossip state

This is the Gossip state representation:

    case class Gossip(
       members: immutable.SortedSet[Member],
       overview: GossipOverview,
       version: VectorClock)
     case class GossipOverview(
       seen: Set[UniqueAddress],
       reachability: Reachability)
  • The gossip state is a CRDT (which means that it has a monotonic merge function).
  • Gossip includes a seen set which represents all nodes that have seen this particular gossip version (versioned by the Vector Clock).
  • It also includes list of unreachable nodes.
  • Cluster convergence is reached when:
    • All nodes are represented in the seen set (a more accurate definition would be 'Local proof of a global state in the past is reached when locally a gossip is observed where all nodes are represented in the seen set' but that is a tad more complicated)
    • We don't have any members that are unreachable, or
    • All unreachable members in the set have status down or exiting.
  • Using GZIP of gossip state works very well since the gossip state contains a lot of redundant information (strings with URLs etc.).

Gossip Algorithm

General

  • Every gossip round is performed by picking a random node with a older or newer version—if it exists—else just any random node.
  • The gossiper then sends a gossip message to the selected recipient node in a request/reply conversational fashion.
  • The recipient uses the gossip version to determine whether it has a newer, older or conflicting version of the gossip state.
    • In the case of conflicting gossip versions the different versions are merged, else
    • The gossip with the newest version is kept.
  • The recipient adds itself to the seen table and sends back the updated gossip state to the gossiper.

General Optimizations

  • If the gossiper and the recipient have seen the same version of the gossip (i.e the recipient is part of the seen table and has the same gossip version as the gossiper) then the gossip state is not sent back to the gossiper.
  • If less than ½ of nodes resides in the seen set (have seen the new state) then the cluster gossips 3 times instead of once every second.
  • If only 1 node is left until convergence is reached then that node is likely to be flooded with gossip requests. In this case the node enters a protection mode in which it throws gossip requests away.

Biased Gossip

  • If no convergence then it uses a probability of 0.8 (80 %) to gossip to a node not part of the seen set—i.e. that has an older version of the state.
  • For clusters larger than 400 nodes (suggested by empirical evidence) the 0.8 probability is gradually reduced to avoid overwhelming single stragglers with too many concurrent gossip requests.

Push/Pull Gossip

  • If convergence is reached then the cluster only gossips a special Status(VectorClock) message, and as soon as there is a change to the cluster (meaning non-convergence) then go back to Biased Gossip again.

Akka Failure Detector

Failure Detector Algorithm

  • Akka's Failure Detector is an Accrual Failure Detector, but have not seemed to gain much from it. Mainly due to the fact that Garbage Collection in Java ofter causes much longer latency spikes than temporary network drops. It turns out that the configuration option that has the biggest impact on the accuracy is the acceptable-heartbeat-pause timeout. The Accrual Failure Detector could perhaps be removed and replaced by a hard timeout.
  • Every node picks 5 other nodes (configurable) to track heartbeats from. These nodes are picked out of a hashed ordered node ring. This to increase likelihood to monitor across racks and data centers.
  • Heartbeats are sent out every second and every heartbeat is performed in a request/reply handshake with the replies used as input to the failure detector.
  • A much more advanced but complicated scheme used to be used, where each node only sent a single one-way heartbeat. Each node knew who to expect heartbeat from and could based on this info alone calculate if the sender was unreachable or not. This strategy was especially complicated in the start up phase of the cluster and has now been replaced by the simpler request/reply strategy.

Network Partitions and Unreachability

  • A node that is suspected to be "down" is marked as unreachable in the gossip state.
  • Only one node needs to mark a node unreachable to have the rest of the cluster mark that node unreachable.
  • When one or many nodes are marked as unreachable the cluster can no longer reach convergence and the leader can not perform its duties. Then we have reached a so-called 'Split Brain' and the only way forward is to mark the unreachable node(s) as down, or wait for the node to become reachable again. This halt only affects the leader in performing its cluster membership management and does not influence the application running on top of the cluster.
  • A node that is marked as unreachable can at any point in time come back to reachable. This can be seen as a variation of the quarantine model with a suspicion set that is used in the SWIM paper.
  • A node (or set of nodes) that is unreachable can be taken out of the cluster either through a manual down operation or by configuring the system to use auto-down. When this is done the cluster will reach convergence again and can continue working as normal.
  • A node unreachable can be set to be auto-downed after a specific time interval.
  • Once a node has been downed or removed it can not come back to reachable again but has to be restarted and join the cluster as a new node. It can reuse the same host:port but will get a different uid.
  • If system messages cannot be delivered to a node it will be quarantined and then it cannot come back from unreachable. This can happen if the there are too many unacknowledged system messages (e.g. watch, Terminated, remote actor deployment, failures of actors supervised by remote parent). Then the node needs to be moved to the down or removed states and the actor system must be restarted before it can join the cluster again.

Leader

  • After gossip convergence a leader for the cluster can be determined.
  • The leader is just a role, any node can be the leader and it can change between convergence rounds.
  • There is no leader election process, the leader can always be recognized deterministically by any node whenever there is gossip convergence and is currently the first node in the sorted node ring.
  • The role of the leader is to shift members in and out of the cluster, changing joining members to the up state or exiting members to the removed state. Currently leader actions are only triggered by receiving a new cluster state with gossip convergence.
  • The leader also has the power, if configured so, to auto-down a node that according to the Failure Detector is considered unreachable.

Potential Future Optimizations

  • VectorClock pruning—currently we are not even removing a node when it leaves.
  • Delegated heartbeat ala SWIM to find ways around a network split.
  • "Real" push/pull gossip with fine grained hashing of state and only shipping deltas, perhaps using Merkle trees.
  • More advanced out-of-the-box auto-down patterns: perhaps taking down all the nodes in the side that consists of a minority, or taking down the side that is not running a cluster singleton, or other strategies (this can already be implemented in through custom user code).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment