Skip to content

Instantly share code, notes, and snippets.

@duarten
Last active July 10, 2019 19:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save duarten/a9a6a7f6aaf4f94f9911d35b941293d3 to your computer and use it in GitHub Desktop.
Save duarten/a9a6a7f6aaf4f94f9911d35b941293d3 to your computer and use it in GitHub Desktop.
Raft protocol interfaces
#pragma once
#include <seastar/include/seastar/core/future.hh>
#include <seastar/include/seastar/core/lowres_clock.hh>
#include <seastar/include/seastar/core/reactor.hh>
#include <seastar/include/seastar/net/socket_defs.hh>
#include <cstdint>
#include <compare>
#include <vector>
namespace raft {
struct replica {
seastar::socket_address id;
};
struct group_leader {
replica node;
bool current_node() const;
};
class term {
uint32_t _val;
public:
term() = default;
explicit term(uint32_t val) noexcept
: _val(val) {
}
uint32_t value() const {
return _val;
}
friend std::strong_ordering operator<=>(const term&, const term&);
};
class log_offset {
uint64_t _pos;
public:
log_offset() = default;
explicit log_offset(uint32_t pos) noexcept
: _pos(pos) {
}
uint64_t position() const {
return _pos;
}
friend std::strong_ordering operator<=>(const log_offset&, const log_offset&);
};
struct log_index {
term term;
seastar::shard_id leader_shard;
log_offset offset;
};
class hybrid_clock {
public:
using logical_component = uint32_t;
using physical_component = uint32_t;
class time_point {
uint64_t _rep;
public:
friend std::weak_equality operator<=>(const time_point&, const time_point&);
};
void update(logical_component);
time_point now();
};
class group_id {
uint64_t _id;
public:
explicit group_id(uint64_t id) noexcept
: _id(id) {
}
friend std::strong_equality operator<=>(const group_id&, const group_id&);
};
struct group {
group_id id;
std::vector<replica> replicas;
};
namespace provider {
// Sends Raft messages between members of a group. The module is not specific to a Raft group.
class rpc {
public:
struct entry {
fragmented_temporary_buffer data;
log_index index;
};
struct success { };
struct stale_leader {
term current_term;
};
struct log_index_mismatch {
log_index current_log_index;
};
struct prev_term_indexes {
std::vector<log_index> indexes;
};
struct got_vote_tag { };
using got_vote = bool_class<got_vote_tag>;
// Sends the append entries payload, containing the following:
// struct append_entries_payload {
// term leader_term;
// replica leader;
// uint32_t shard;
// log_index leader_commit_index;
// log_index prev_log_index;
// term prev_log_term;
// entry entries[];
// };
// The follower either successfully processes the RPC, or updates a stale leader with
// the current term, or sends the last index it has on its log for the current term (if
// doesn't match the index of the first entry), or returns the log indexes of all leader
// shards for the last term for which it received entries.
virtual future<std::variant<success, stale_leader, log_index_mismatch, prev_term_indexes>> append_entries(
replica destination,
term leader_term,
log_index leader_commit_index,
log_index prev_log_index,
std::vector<entry>) = 0;
// Sends the request vote payload, containing the following:
// struct request_vote_payload {
// term candidate_term;
// replica candidate;
// log_index last_index;
// };
// The candidate either receives the vote or not, or it is updated with the current term of the group.
virtual future<std::variant<got_vote, stale_leader>> request_vote(
std::vector<replica> peers,
term candidate_term,
log_index last_index) = 0;
};
// Sends heartbeats on behalf of all groups of this node, and across all shards
// Should exist only on one shard.
class heartbeats {
public:
// The callback receives the election timeout to use. That timeout can be biased
// by how many groups the current node is already a leader of.
using on_leader_timeout = std::function<future<group_leader>(const group&, seastar::lowres_clock::time_point)>;
protected:
on_leader_timeout _on_leader_timeout;
public:
heartbeats(on_leader_timeout cb)
: _on_leader_timeout(std::move(cb)) {
}
// Registers a new Raft group, for which leader election should be triggered.
virtual future<group_leader> register_group(const group&) = 0;
// Stop sending heartbeats for this group, as leadership has been relinquished.
virtual future<> relinquish_leadership(const group&) = 0;
// Sets the commit index for a particular group. If the current node is the leader,
// then that commit index is sent in heartbeat messages.
virtual future<> set_commit_index(const group&, log_index) = 0;
// Unregisters a Raft group.
virtual future<> unregister_group(const group&) = 0;
};
// State-machine for a particular Raft group.
class state_machine {
// Transfers the contents of the state machine to the specified replica.
virtual future<> transfer_to(replica) = 0;
// Applies the specified entry to the state machine.
virtual future<> apply(fragmented_temporary_buffer) = 0;
};
// Stores the persistent Raft state belonging to a Raft group.
class state {
// Registers the replica for which the current node voted, for the specified term.
// Should replicate that information across shards. When the future resolves, the
// information should be persisted on disk.
virtual future<> register_vote(term, replica) = 0;
// Registers the current commit index for the current leader shard. When the future resolves,
// the information is not guaranteed to have been persisted.
virtual future<> register_commit_index_relaxed(log_index) = 0;
// Registers the previous information of the discarded log for the current leader shard.
// When the future resolves, the information should be persisted on disk.
virtual future<> register_log_compaction(log_index, std::vector<replica> config) = 0;
};
// Implements the persistent Raft log for a particular Raft group.
class log {
// Appends the specified entry for the current shard, at the specified term.
// When the future resolves, the entry should be persisted in stable storage.
virtual future<log_index> append(
term,
size_t,
std::function<void(fragmented_temporary_buffer::ostream&)>) = 0;
// The persisted tail of the log for the current shard, used to include in the append_entries RPC.
virtual log_index tail() const = 0;
// The tail of the log for all shards of a term, used to bring up a follower to date.
virtual std::vector<log_index> tail_for_all_shards_of(term) = 0;
// Discards a prefix of the log, to support log compaction.
virtual future<> discard_prefix(log_index) = 0;
// Discards a suffix of the log, when support removing entries from an outdated follower.
virtual future<> discard_suffix(log_index) = 0;
// Reads a set of entries in the specified range.
using process_entry = std::function<future<>(const fragmented_temporary_buffer&)>;
virtual future<> read(log_index, log_index, process_entry) = 0;
};
} // namespace provider
// The base type of Raft entries. There can be multiple types of entries managed
// by the same Raft group, each with different requirements.
class entry {
using type = unsigned;
// The type of entry, useful to retrieve associated types (e.g., a deserializer).
virtual type entry_type() const = 0;
// Serializes the entry to the specified buffer.
virtual void write(fragmented_temporary_buffer::ostream&) = 0;
// Called when the entry has been replicated across the set of replicas of the group.
// Side-effects should be idempotent, as the callback can be called multiple times (e.g.,
// when replaying the log).
virtual future<> on_replicated() = 0;
// Sets the timestamp
virtual void set_timestamp(hybrid_clock::time_point) = 0;
};
// Deserializes entries of a particular type.
class entry_deserializer {
virtual entry read(const fragmented_temporary_buffer&) = 0;
};
// Exposes the Raft protocol to external consumers, for a particular group.
// Consumes the heartbeat, rpc, state machine, state and log providers.
class protocol {
public:
// Specifies whether the current node is the leader of this Raft instance.
virtual bool is_leader() const = 0;
// Returns the leader, useful to forward requests to it.
virtual group_leader leader() const = 0;
// Replicates the specified entry across the Raft group. Returns when the entry has been committed.
virtual future<> replicate(entry) = 0;
// When the group's configuration changes. Calls to this function are ordered w.r.t. calls to replicate().
virtual future<> update_configuration(std::vector<replica>) = 0;
};
} // namespace raft
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment