Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active May 7, 2020 14:45
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/5972445107c14c9a9ce66cec42f06211 to your computer and use it in GitHub Desktop.
Save Horusiath/5972445107c14c9a9ce66cec42f06211 to your computer and use it in GitHub Desktop.
CASPaxos implementation
/// Toy implementation of CAS Paxos (see: https://github.com/rystsov/caspaxos/blob/master/latex/caspaxos.pdf).
module DemoFs.CASPaxos
open Akka.Actor
open Akkling
open System
type NodeId = string
[<Struct>]
type Ballot =
{ SeqNr: int64; Id: NodeId; }
override this.ToString() = sprintf "(%i:%s)" this.SeqNr this.Id
type Protocol<'state> =
/// Return current state. Applicable on both proposer and acceptor.
| Query
/// Request for change send to proposer. Change is a lambda, so it requires a serializer, which is able to serialize
/// these. It's expected to be replied with Result<'state,string> which can contain the failure when timeout was reached.
/// We use UTC time as deadline (similar to gRPC deadlines), so that we can auto-retry proposal after conflict was detected.
/// Ofc. using non-monotonic clock can lead to "funny" results (somewhat solvable with hybrid logical clocks), so
/// keep that in mind.
| Propose of deadline:DateTime * change:('state -> 'state)
/// 1st phase is to send Prepare request to all acceptors. After majority confirms with `PrepareReply`, we send `Accept`.
| Prepare of proposal:Ballot * replyTo:Endpoint<'state>
/// When state is None, it means accepted value. Otherwise ballot of the acceptor is higher than the one we sent.
| PrepareReply of Ballot * 'state option
/// Timeout for entire change `Propose` (calculated from deadline). If prepare phase failed (because acceptor had
/// higher ballot), we'll retry it if it's possible under deadline.
| ProposeTimeout
/// 2nd phase - proposer sends `Accept` request to all acceptors. After majority responds with `AcceptReply`, an
/// updated state is send to original Propose sender.
| Accept of Ballot * 'state * replyTo:Endpoint<'state>
| AcceptReply
and Endpoint<'s> = IActorRef<Protocol<'s>>
type ProposerState<'state> =
{ Ballot: Ballot
State: 'state
Acceptors: Set<Endpoint<'state>> }
module ProposerState =
let create nodeId state acceptors = { Ballot = { SeqNr = 0L; Id = nodeId }; State = state; Acceptors = acceptors }
type AcceptorState<'state> =
{ Promise: Ballot option
Accepted: Ballot
State: 'state }
module AcceptorState =
let create nodeId state = { Promise = None; Accepted = { SeqNr = 0L; Id = nodeId }; State = state }
type Proposal<'state> =
{ Change: 'state -> 'state
HighestBallot: Ballot
HighestState: 'state
Client: IActorRef<Result<'state, string>>
Pending: Set<Endpoint<'state>>
Deadline: DateTime
Timeout: ICancelable }
let inline isMajority acceptors remaining =
if Set.isEmpty acceptors then true
else
let a = Set.count acceptors
let n = Set.count remaining
n < (a / 2) + 1
let rec proposer (state: ProposerState<'s>) (ctx: Actor<Protocol<'s>>) = actor {
match! ctx.Receive() with
| Query -> ctx.Sender() <! state.State; return! proposer state ctx
| Propose(deadline, change) when deadline > DateTime.UtcNow ->
let ballot = { state.Ballot with SeqNr = state.Ballot.SeqNr + 1L }
let msg = Prepare(ballot, ctx.Self)
for acceptor in state.Acceptors do acceptor <! msg
let cancel = ctx.Schedule (deadline - DateTime.UtcNow) ctx.Self ProposeTimeout
let proposal =
{ Client = ctx.Sender()
HighestBallot = ballot
HighestState = state.State
Change = change
Pending = state.Acceptors
Deadline = deadline
Timeout = cancel }
logDebugf ctx "created proposal %O" ballot
return! proposing { state with Ballot = ballot } proposal ctx
| Propose _ -> // deadline reached
ctx.Sender() <! Error "didn't finished operation before deadline"
return! proposer state ctx
| _ -> return Unhandled }
and private proposing (state: ProposerState<'s>) (proposal: Proposal<'s>) (ctx: Actor<Protocol<'s>>) = actor {
match! ctx.Receive() with
| Query -> ctx.Sender() <! state.State; return! proposing state proposal ctx
| Propose _ ->
ctx.Stash()
return! proposing state proposal ctx
| PrepareReply(ballot, Some data) when ballot > state.Ballot ->
logDebugf ctx "prepare - received higher ballot number %O" ballot
proposal.Timeout.Cancel()
ctx.Self.Tell(Propose(proposal.Deadline, proposal.Change), untyped proposal.Client)
ctx.UnstashAll()
return! proposer { state with State = data; Ballot = { ballot with Id = state.Ballot.Id } } ctx
| PrepareReply(ballot, _) ->
let remaining = Set.remove (ctx.Sender()) proposal.Pending
if isMajority state.Acceptors remaining then
logDebugf ctx "prepare - received confirm from majority of nodes"
let modified = proposal.Change state.State
let msg = Accept(ballot, modified, ctx.Self)
for acceptor in state.Acceptors do acceptor <! msg
return! accepting { state with State = modified } { proposal with Pending = state.Acceptors } ctx
else
logDebugf ctx "prepare - received confirm"
return! proposing state { proposal with Pending = remaining } ctx
| ProposeTimeout ->
proposal.Client <! Error "failed to gather prepare confirmations before the deadline"
ctx.UnstashAll()
return! proposer state ctx
| _ -> return Unhandled }
and private accepting (state: ProposerState<'s>) (proposal: Proposal<'s>) (ctx: Actor<Protocol<'s>>) = actor {
match! ctx.Receive() with
| Query -> ctx.Sender() <! state.State; return! accepting state proposal ctx
| Propose _ ->
ctx.Stash()
return! accepting state proposal ctx
| AcceptReply ->
let remaining = Set.remove (ctx.Sender()) proposal.Pending
if isMajority state.Acceptors remaining then
logDebugf ctx "accept - received confirm from majority of nodes"
proposal.Client <! Ok state.State
ctx.UnstashAll()
return! proposer state ctx
else
logDebugf ctx "accept - received confirm"
return! accepting state { proposal with Pending = remaining } ctx
| ProposeTimeout ->
proposal.Client <! Error "prepare phase completed, but didn't received accept from majority of nodes on time"
ctx.UnstashAll()
return! proposer state ctx
| _ -> return Unhandled }
let rec acceptor (state: AcceptorState<'state>) (ctx: Actor<_>) = actor {
match! ctx.Receive() with
| Query -> ctx.Sender() <! state.State; return! acceptor state ctx
| Prepare(ballot, replyTo) ->
match state.Promise with
| Some promised when promised > ballot ->
replyTo <! PrepareReply(state.Accepted, Some state.State)
return! acceptor state ctx
| _ when state.Accepted > ballot ->
replyTo <! PrepareReply(state.Accepted, Some state.State)
return! acceptor state ctx
| _ ->
logDebugf ctx "sending prepare confirmation"
replyTo <! PrepareReply(ballot, None)
return! acceptor { state with Promise = Some ballot } ctx
| Accept(ballot, modified, sender) when ballot > state.Accepted ->
match state.Promise with
| Some promised when promised > ballot ->
//NOTE: this is always the "funniest" part of every 2-phase commit protocol:
// what happens when the 2nd phase fails?
return Unhandled
| _ ->
logDebugf ctx "sending accept confirmation"
sender <! AcceptReply
return! acceptor { state with Accepted = ballot; State = modified; Promise = None } ctx
| _ -> return Unhandled }
module Program =
let main () =
let sys = System.create "sys" <| Configuration.parse "akka.loglevel = DEBUG"
let acceptors =
[|'A'..'E'|]
|> Array.map (fun name ->
let name = string name
let state = AcceptorState.create name Set.empty
spawn sys name <| props (acceptor state))
|> Set.ofArray
let proposer =
let state = ProposerState.create "P" Set.empty acceptors
spawn sys "P" <| props (proposer state)
async {
proposer <! Propose(DateTime.UtcNow.AddSeconds(10.), Set.add "hello")
let! state = proposer <? Propose(DateTime.UtcNow.AddSeconds(10.), Set.add "world")
printfn "Reply: %A" state
Console.ReadLine() |> ignore
} |> Async.RunSynchronously
0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment