Skip to content

Instantly share code, notes, and snippets.

@jarohen
Last active September 12, 2023 16:12
Show Gist options
  • Save jarohen/24f6c4dbc505b65748f7b7a50de8d527 to your computer and use it in GitHub Desktop.
Save jarohen/24f6c4dbc505b65748f7b7a50de8d527 to your computer and use it in GitHub Desktop.
A quick hack at actually implementing the Raft algorithm to aid my learning - thanks @bbengfort and @krisajenkins for the podcast!
(ns rafting
(:require [clojure.tools.logging :as log])
(:import java.lang.AutoCloseable
(java.util.concurrent CompletableFuture Executors TimeUnit)))
(defn new-timeout-ms []
(+ (System/currentTimeMillis)
200
(rand-int 150)))
(defn timeout? [{:keys [timeout-at-ms]}]
(< timeout-at-ms (System/currentTimeMillis)))
(defn quorum [ag-count]
(inc (quot ag-count 2)))
(defn other-agents [{:keys [id]} agents]
(for [!ag agents
:let [{other-id :id} @!ag]
:when (not= other-id id)]
!ag))
(defn append-events-resp [{:keys [current-term] :as ag} {resp-term :term, :keys [follower-id success? last-log-idx]} agents]
(cond
success?
(cond-> ag
last-log-idx (-> (assoc-in [:next-indices follower-id] (inc last-log-idx))
(assoc-in [:match-indices follower-id] last-log-idx)))
(not= resp-term current-term)
(-> ag
(assoc :state :follower,
:current-term resp-term
:timeout-at-ms (new-timeout-ms))
(dissoc :next-indices :match-indices))
:else
(-> ag
(update-in [:next-indices follower-id] dec))))
(defn append-events-req [{:keys [id current-term log commit-idx] :as ag}
{req-term :term, :keys [leader-id events prev-log leader-commit-idx]}
agents]
(if (< req-term current-term)
(do
(send (nth agents leader-id) append-events-resp
{:success? false, :follower-id id, :term current-term}
agents)
ag)
(let [new-commit-idx (if (>= commit-idx leader-commit-idx)
commit-idx
(cond-> leader-commit-idx
(seq events) (min (:log-idx (last events)))))
{:keys [current-term] :as ag} (-> ag
(assoc :state :follower,
:current-term req-term
:leader-id leader-id
:timeout-at-ms (new-timeout-ms)
:commit-idx new-commit-idx)
(dissoc :voted-for :votes-recd
:next-indices :match-indices))]
(cond
(when-let [{prev-log-idx :log-idx, prev-log-term :term} prev-log]
(or (>= prev-log-idx (count log))
(not= (:term (nth log prev-log-idx)) prev-log-term)))
(do
(send (nth agents leader-id) append-events-resp
{:success? false, :follower-id id, :term current-term}
agents)
(-> ag
(update :log subvec 0 (:log-idx prev-log))))
:else
(do
(dotimes [n (- new-commit-idx commit-idx)]
(log/infof "Agent %d: apply log-idx %d" id (+ commit-idx n 1)))
(send (nth agents leader-id) append-events-resp
{:success? true, :term current-term, :follower-id id, :last-log-idx (:log-idx (last events))}
agents)
(-> ag
(update :log into events)))))))
(defn request-vote-resp [{:keys [id state current-term votes-recd log commit-idx] :as ag} {resp-term :term, :keys [voter-id vote-granted?]} agents]
(log/infof "Agent %d received %s from %d" id (if vote-granted? "vote" "rejection") voter-id)
(if (and (= state :candidate) vote-granted? (= current-term resp-term))
(let [votes-recd (conj votes-recd voter-id)]
(if (>= (count votes-recd) (quorum (count agents)))
(do
(log/infof "Agent %d is now leader!" id)
(doseq [!ag (other-agents ag agents)]
(send !ag append-events-req
{:term current-term, :leader-id id, :leader-commit-idx commit-idx}
agents))
(-> ag
(assoc :state :leader,
:leader-id id
:next-indices (vec (repeat (count agents) (count log)))
:match-indices (vec (repeat (count agents) -1)))
(dissoc :votes-recd :voted-for)))
(assoc ag :votes-recd votes-recd)))
ag))
(defn request-vote-req [{:keys [id current-term voted-for] :as ag} {candidate-term :term, :keys [candidate-id]} agents]
(log/infof "Agent %d received vote request from %d" id candidate-id)
(if (and (< current-term candidate-term)
(nil? voted-for))
(do
(send (nth agents candidate-id) request-vote-resp
{:term candidate-term,
:voter-id id
:vote-granted? true}
agents)
(-> ag
(assoc :term candidate-term, :voted-for candidate-id)))
(do
(send (nth agents candidate-id) request-vote-resp
{:term current-term,
:voter-id id
:vote-granted? false}
agents)
ag)))
(defn tick [{:keys [id state current-term] :as ag} agents]
(case state
(:follower :candidate)
(cond
(timeout? ag)
(let [term (inc current-term)]
(doseq [!ag (other-agents ag agents)]
(send !ag request-vote-req
{:term term, :candidate-id id}
agents))
(assoc ag
:state :candidate
:current-term term
:voted-for id
:votes-recd #{id}
:timeout-at-ms (new-timeout-ms)))
:else ag)
:leader
(let [{:keys [log next-indices match-indices commit-idx]} ag
new-commit-idx (nth (sort match-indices) (quorum (count agents)))]
(dotimes [n (- new-commit-idx commit-idx)]
(log/infof "Agent %d: apply log-idx %d" id (+ commit-idx n 1)))
(doseq [!ag (other-agents ag agents)
:let [{ag-id :id} @!ag
next-idx (nth next-indices ag-id)]]
(send !ag append-events-req
{:term current-term,
:leader-id id
:events (when (< next-idx (count log))
(subvec log next-idx (min (- (count log) next-idx)
10)))
:leader-commit-idx new-commit-idx
:prev-log (let [prev-log-idx (dec next-idx)]
(when-not (neg? prev-log-idx)
{:log-idx prev-log-idx
:term (:term (nth log prev-log-idx))}))}
agents))
(-> ag
(assoc :commit-idx new-commit-idx)))))
(defn open-ticker ^AutoCloseable [!ag agents]
(let [tp (Executors/newScheduledThreadPool 1)]
(.scheduleAtFixedRate tp #(send-off !ag tick agents) 0 20 TimeUnit/MILLISECONDS)
(reify AutoCloseable
(close [_]
(.shutdownNow tp)
(.awaitTermination tp 1 TimeUnit/SECONDS)))))
(defn submit-tx [{:keys [id state leader-id current-term log] :as ag} tx ^CompletableFuture !fut agents]
(cond
(= state :leader) (let [log-idx (count log)]
(.complete !fut {:log-idx log-idx})
(-> ag
(update :log conj {:term current-term
:log-idx log-idx
:tx tx})
(update-in [:next-indices id] inc)
(update-in [:match-indices id] inc)))
(nil? leader-id) (do
(.completeExceptionally !fut (ex-info "no leader" {}))
ag)
:else (do
(send (nth agents leader-id) submit-tx tx !fut agents)
ag)))
(defn submit-tx&! [tx agents]
(let [!fut (CompletableFuture.)]
(send (rand-nth agents) submit-tx tx !fut agents)
!fut))
(do
(def !ag0 (agent {:id 0, :state :follower, :log [], :current-term -1, :commit-idx -1, :timeout-at-ms (new-timeout-ms)}))
(def !ag1 (agent {:id 1, :state :follower, :log [], :current-term -1, :commit-idx -1, :timeout-at-ms (new-timeout-ms)}))
(def !ag2 (agent {:id 2, :state :follower, :log [], :current-term -1, :commit-idx -1, :timeout-at-ms (new-timeout-ms)}))
(def agents [!ag0 !ag1 !ag2])
(with-open [t0 (open-ticker !ag0 agents)
t1 (open-ticker !ag1 agents)
t2 (open-ticker !ag2 agents)]
(Thread/sleep 500)
[@(submit-tx&! :my-tx0 agents)
@(submit-tx&! :my-tx1 agents)
(Thread/sleep 100)
agents]
))
#_(agent-error !ag0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment