Skip to content

Instantly share code, notes, and snippets.

@Horusiath

Horusiath/Program.fs

Last active Apr 9, 2020
Embed
What would you like to do?
Toy implementation of SWIM protocol in Akkling (Akka.NET F#)
open System
open System.Threading
open Akkling
open DemoFs
[<EntryPoint>]
let main argv =
let config = """
akka.loglevel = DEBUG
"""
use sys = System.create "swim-cluster" <| Configuration.parse config
let a = spawn sys "node-a" <| props (Swim.membership [])
Thread.Sleep(1000)
let b = spawn sys "node-b" <| props (Swim.membership [a])
Thread.Sleep(1000)
let c = spawn sys "node-c" <| props (Swim.membership [a])
Thread.Sleep(30_000)
sys.Stop (untyped b) // stop node B after 30 sec
Console.ReadLine() |> ignore
0 // return an integer exit code
module DemoFs.Swim
open Akka.Actor
open Akka.Util
open System
open Akkling
type Endpoint = IActorRef<SwimProtocol>
and SwimProtocol =
/// Triggers next ping round on current member.
| NextRound
/// Ping message send from `replyTo` to another member with `status` piggybacked on.
| Ping of status:GossipStatus * replyTo:Endpoint
/// If `Ping` or `PingReq` where not replied on time, `PingTimeout` will happen.
| PingTimeout of suspect:Member
/// Reply to `Ping` request.
| PingAck of who:Member * status:GossipStatus
/// Indirect ping request my by member, who issued `Ping` but didn't received `PingAck` before timeout.
| PingReq of suspect:Member * status:GossipStatus * replyTo:Endpoint
/// Message broadcast among all nodes once suspected member didn't respond on `Ping` request on time.
| Suspected of Member
/// Message that overrides `Suspected` and marks suspected node as alive.
| Alive of Member
/// Message that confirms that node is dead. It overrides suspected, can be issued manually for
/// graceful cluster leave.
| Left of Member
/// Request issued by node trying to join the cluster, should be responded with `Joined` broadcast
/// among all cluster members (including new node).
| Join of Member
/// If `Join` request was not responded on time, a timeout is issued and next node from contact list
/// will receive join request.
| JoinTimeout of remaining:Endpoint list
/// Positive response for `Join` request broadcast among all of the nodes.
| Joined of GossipStatus
and Member = { Endpoint: Endpoint }
and GossipStatus = Set<Member>
type Message<'M> = Endpoint * 'M
[<Struct>]
type SuspectType =
/// On actor which issued original `Ping`.
| WaitPingAck of Set<Member>
/// On actor which issues `Ping` on behalf of another actor who sent it `PingReq`
| WaitPingReqAck
/// On actors who received `Suspected` notice.
| WaitConfirm
type SuspectState = ICancelable * SuspectType
type MembershipState =
{ Myself: Member
Active: Set<Member>
Suspects: Map<Member, SuspectState> }
static member Create (endpoint: Endpoint) =
let myself = { Endpoint = endpoint }
{ Myself = myself
Active = Set.empty
Suspects = Map.empty }
let pingTimeout = TimeSpan.FromSeconds 10.
let indirectPingTimeout = TimeSpan.FromSeconds 5.
let pingInterval = TimeSpan.FromSeconds 30.
let joinTimeout = TimeSpan.FromSeconds 30.
let suspicionTimeout = TimeSpan.FromSeconds 30.
let membership (seeds: Endpoint list) (ctx: Actor<_>) =
let state = MembershipState.Create ctx.Self
let pick peers =
match Set.count peers with
| 0 -> None
| count ->
let idx = ThreadLocalRandom.Current.Next count
Some (Seq.item idx peers)
let leave peer state =
let active = Set.remove peer state.Active
let left = Left peer
active
|> Set.remove state.Myself
|> Set.iter (fun peer -> peer.Endpoint <! left)
{ state with Suspects = Map.remove peer state.Suspects; Active = active }
let merge gossip state =
let added = gossip - state.Active |> Set.map (fun peer -> peer.Endpoint.Path.Name)
let removed = state.Active - gossip |> Set.map (fun peer -> peer.Endpoint.Path.Name)
if not (Set.isEmpty added && Set.isEmpty removed) then
logDebugf ctx "Received gossip - added: %O, removed: %O" added removed
{ state with Active = state.Active + gossip }
let rec ready state = actor {
match! ctx.Receive() with
| NextRound ->
ctx.Schedule pingInterval ctx.Self NextRound |> ignore
let suspects = state.Suspects |> Map.toSeq |> Seq.map fst |> Set.ofSeq
let others = (Set.remove state.Myself state.Active) - suspects
// pick one member at random, other than self and not marked as suspected
match pick others with
| None -> return! ready state
| Some peer ->
peer.Endpoint <! Ping(state.Active, ctx.Self)
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout peer)
let skipList = Set.ofArray [| state.Myself; peer |]
return! ready { state with Suspects = Map.add peer (cancel, WaitPingAck skipList) state.Suspects }
| Ping (gossip, sender) ->
logDebugf ctx "Received PING from '%s'. Sending ACK." sender.Path.Name
sender <! PingAck(state.Myself, state.Active)
return! ready (merge gossip state)
| PingAck (who, gossip) ->
logDebugf ctx "Received ACK from '%s'." who.Endpoint.Path.Name
let newState =
match Map.tryFind who state.Suspects with
| None -> state
| Some (cancel, status) ->
cancel.Cancel()
let newState = { merge gossip state with Suspects = Map.remove who state.Suspects }
match status with
| WaitPingReqAck ->
// notify everyone that actor is alive and no longer suspected
let msg = Alive who
Set.remove newState.Myself newState.Active
|> Set.iter (fun peer -> peer.Endpoint <! msg)
| _ -> ()
newState
return! ready (merge gossip newState)
| PingReq (suspect, gossip, sender) ->
logDebugf ctx "Received PING REQ: '%s' -> '%s'." sender.Path.Name suspect.Endpoint.Path.Name
let cancel = ctx.Schedule indirectPingTimeout ctx.Self (PingTimeout suspect)
suspect.Endpoint <! Ping(state.Active, ctx.Self)
return! ready { merge gossip state with Suspects = Map.add suspect (cancel, WaitPingReqAck) state.Suspects }
| Suspected suspect when suspect = state.Myself ->
(Set.remove state.Myself state.Active)
|> Set.iter (fun peer -> peer.Endpoint <! Alive state.Myself)
return! ready state
| Suspected suspect ->
let cancel = ctx.Schedule suspicionTimeout ctx.Self (Alive suspect)
match Map.tryFind suspect state.Suspects with
| None -> return! ready { state with Suspects = Map.add suspect (cancel, WaitConfirm) state.Suspects }
| _ -> return! ready state
| Alive peer ->
let newState =
match Map.tryFind peer state.Suspects with
| None -> state
| Some (cancel, _) ->
cancel.Cancel()
{ state with Suspects = Map.remove peer state.Suspects }
return! ready newState
| PingTimeout suspect ->
match Map.tryFind suspect state.Suspects with
| None -> return! ready state // duplicate or invalidated
| Some (_, WaitPingReqAck) ->
logDebugf ctx "REQ ACK timeout. '%s' confirmed dead." suspect.Endpoint.Path.Name
return! ready (leave suspect state)
| Some (_, WaitPingAck skipList) ->
let peers = state.Active - skipList
logDebugf ctx "ACK timeout from '%s'. Picking from: %O" suspect.Endpoint.Path.Name (peers |> Set.map (fun p -> p.Endpoint.Path.Name))
match pick peers with
| Some other -> // ask someone else to make a ping to confirm suspect is dead or alive
other.Endpoint <! PingReq(suspect, state.Active, ctx.Self)
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout suspect)
return! ready { state with Suspects = Map.add suspect (cancel, WaitPingAck(Set.add other skipList)) state.Suspects }
| None -> return! ready (leave suspect state) // no one left to ask, mark suspect as dead
| _ -> return Unhandled
| Join peer ->
logDebugf ctx "Peer '%s' requests to join the cluster." peer.Endpoint.Path.Name
let gossip = Set.add peer state.Active
let msg = Joined gossip
gossip
|> Set.remove state.Myself
|> Set.iter (fun peer -> peer.Endpoint <! msg)
return! ready { state with Active = gossip }
| Joined gossip ->
return! ready (merge gossip state)
| Left peer ->
logDebugf ctx "Peer '%s' left the cluster." peer.Endpoint.Path.Name
match Map.tryFind peer state.Suspects with
| None -> return! ready { state with Active = Set.remove peer state.Active }
| Some (cancel, _) ->
cancel.Cancel()
return! ready { state with Active = Set.remove peer state.Active; Suspects = Map.remove peer state.Suspects }
| _ -> return Unhandled
}
let becomeReady state gossip =
logInfof ctx "%A successfully joined SWIM cluster: %A" state.Myself.Endpoint.Path.Name (gossip |> Set.map (fun p -> p.Endpoint.Path.Name))
ctx.Schedule pingInterval ctx.Self NextRound |> ignore
ready (merge gossip state)
let rec joining state cancel = actor {
match! ctx.Receive() with
| JoinTimeout [] ->
logErrorf ctx "Failed to join any of the members. Shutting down"
return Stop
| JoinTimeout (next::remaining) ->
logDebugf ctx "Failed to join to a member. Try to join %O" next.Path.Name
next <! Join state.Myself
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining)
return! joining state cancel
| Joined gossip ->
cancel.Cancel() // cancel join timeout
return! becomeReady state gossip
| Join peer when peer = state.Myself -> // establish new cluster
cancel.Cancel() // cancel join timeout
return! becomeReady state (Set.singleton state.Myself)
| _ -> return Unhandled
}
match seeds with
| [] -> becomeReady state (Set.singleton state.Myself)
| seed::remaining ->
seed <! Join state.Myself
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining)
joining state cancel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment