Skip to content

Instantly share code, notes, and snippets.

@Horusiath

Horusiath/Program.fs

Last active Apr 9, 2020
Embed
What would you like to do?
Toy implementation of SWIM protocol in Akkling (Akka.NET F#)
open System
open System.Threading
open Akkling
open DemoFs
[<EntryPoint>]
let main argv =
let config = """
akka.loglevel = DEBUG
"""
use sys = System.create "swim-cluster" <| Configuration.parse config
let a = spawn sys "node-a" <| props (Swim.membership [])
Thread.Sleep(1000)
let b = spawn sys "node-b" <| props (Swim.membership [a])
Thread.Sleep(1000)
let c = spawn sys "node-c" <| props (Swim.membership [a])
Thread.Sleep(30_000)
sys.Stop (untyped b) // stop node B after 30 sec
Console.ReadLine() |> ignore
0 // return an integer exit code
module DemoFs.Swim
open Akka.Actor
open Akka.Util
open System
open Akkling
type Endpoint = IActorRef<SwimProtocol>
and SwimProtocol =
/// Triggers next ping round on current member.
| NextRound
/// Ping message send from `replyTo` to another member with `status` piggybacked on.
| Ping of status:GossipStatus * replyTo:Endpoint
/// If `Ping` or `PingReq` where not replied on time, `PingTimeout` will happen.
| PingTimeout of suspect:Member
/// Reply to `Ping` request.
| PingAck of who:Member * status:GossipStatus
/// Indirect ping request my by member, who issued `Ping` but didn't received `PingAck` before timeout.
| PingReq of suspect:Member * status:GossipStatus * replyTo:Endpoint
/// Message broadcast among all nodes once suspected member didn't respond on `Ping` request on time.
| Suspected of Member
/// Message that overrides `Suspected` and marks suspected node as alive.
| Alive of Member
/// Message that confirms that node is dead. It overrides suspected, can be issued manually for
/// graceful cluster leave.
| Left of Member
/// Request issued by node trying to join the cluster, should be responded with `Joined` broadcast
/// among all cluster members (including new node).
| Join of Member
/// If `Join` request was not responded on time, a timeout is issued and next node from contact list
/// will receive join request.
| JoinTimeout of remaining:Endpoint list
/// Positive response for `Join` request broadcast among all of the nodes.
| Joined of GossipStatus
and Member = { Endpoint: Endpoint }
and GossipStatus = Set<Member>
type Message<'M> = Endpoint * 'M
[<Struct>]
type SuspectType =
/// On actor which issued original `Ping`.
| WaitPingAck of Set<Member>
/// On actor which issues `Ping` on behalf of another actor who sent it `PingReq`
| WaitPingReqAck
/// On actors who received `Suspected` notice.
| WaitConfirm
type SuspectState = ICancelable * SuspectType
type MembershipState =
{ Myself: Member
Active: Set<Member>
Suspects: Map<Member, SuspectState> }
static member Create (endpoint: Endpoint) =
let myself = { Endpoint = endpoint }
{ Myself = myself
Active = Set.empty
Suspects = Map.empty }
let pingTimeout = TimeSpan.FromSeconds 10.
let indirectPingTimeout = TimeSpan.FromSeconds 5.
let pingInterval = TimeSpan.FromSeconds 30.
let joinTimeout = TimeSpan.FromSeconds 30.
let suspicionTimeout = TimeSpan.FromSeconds 30.
let membership (seeds: Endpoint list) (ctx: Actor<_>) =
let state = MembershipState.Create ctx.Self
let pick peers =
match Set.count peers with
| 0 -> None
| count ->
let idx = ThreadLocalRandom.Current.Next count
Some (Seq.item idx peers)
let leave peer state =
let active = Set.remove peer state.Active
let left = Left peer
active
|> Set.remove state.Myself
|> Set.iter (fun peer -> peer.Endpoint <! left)
{ state with Suspects = Map.remove peer state.Suspects; Active = active }
let merge gossip state =
let added = gossip - state.Active |> Set.map (fun peer -> peer.Endpoint.Path.Name)
let removed = state.Active - gossip |> Set.map (fun peer -> peer.Endpoint.Path.Name)
if not (Set.isEmpty added && Set.isEmpty removed) then
logDebugf ctx "Received gossip - added: %O, removed: %O" added removed
{ state with Active = state.Active + gossip }
let rec ready state = actor {
match! ctx.Receive() with
| NextRound ->
ctx.Schedule pingInterval ctx.Self NextRound |> ignore
let suspects = state.Suspects |> Map.toSeq |> Seq.map fst |> Set.ofSeq
let others = (Set.remove state.Myself state.Active) - suspects
// pick one member at random, other than self and not marked as suspected
match pick others with
| None -> return! ready state
| Some peer ->
peer.Endpoint <! Ping(state.Active, ctx.Self)
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout peer)
let skipList = Set.ofArray [| state.Myself; peer |]
return! ready { state with Suspects = Map.add peer (cancel, WaitPingAck skipList) state.Suspects }
| Ping (gossip, sender) ->
logDebugf ctx "Received PING from '%s'. Sending ACK." sender.Path.Name
sender <! PingAck(state.Myself, state.Active)
return! ready (merge gossip state)
| PingAck (who, gossip) ->
logDebugf ctx "Received ACK from '%s'." who.Endpoint.Path.Name
let newState =
match Map.tryFind who state.Suspects with
| None -> state
| Some (cancel, status) ->
cancel.Cancel()
let newState = { merge gossip state with Suspects = Map.remove who state.Suspects }
match status with
| WaitPingReqAck ->
// notify everyone that actor is alive and no longer suspected
let msg = Alive who
Set.remove newState.Myself newState.Active
|> Set.iter (fun peer -> peer.Endpoint <! msg)
| _ -> ()
newState
return! ready (merge gossip newState)
| PingReq (suspect, gossip, sender) ->
logDebugf ctx "Received PING REQ: '%s' -> '%s'." sender.Path.Name suspect.Endpoint.Path.Name
let cancel = ctx.Schedule indirectPingTimeout ctx.Self (PingTimeout suspect)
suspect.Endpoint <! Ping(state.Active, ctx.Self)
return! ready { merge gossip state with Suspects = Map.add suspect (cancel, WaitPingReqAck) state.Suspects }
| Suspected suspect when suspect = state.Myself ->
(Set.remove state.Myself state.Active)
|> Set.iter (fun peer -> peer.Endpoint <! Alive state.Myself)
return! ready state
| Suspected suspect ->
let cancel = ctx.Schedule suspicionTimeout ctx.Self (Alive suspect)
match Map.tryFind suspect state.Suspects with
| None -> return! ready { state with Suspects = Map.add suspect (cancel, WaitConfirm) state.Suspects }
| _ -> return! ready state
| Alive peer ->
let newState =
match Map.tryFind peer state.Suspects with
| None -> state
| Some (cancel, _) ->
cancel.Cancel()
{ state with Suspects = Map.remove peer state.Suspects }
return! ready newState
| PingTimeout suspect ->
match Map.tryFind suspect state.Suspects with
| None -> return! ready state // duplicate or invalidated
| Some (_, WaitPingReqAck) ->
logDebugf ctx "REQ ACK timeout. '%s' confirmed dead." suspect.Endpoint.Path.Name
return! ready (leave suspect state)
| Some (_, WaitPingAck skipList) ->
let peers = state.Active - skipList
logDebugf ctx "ACK timeout from '%s'. Picking from: %O" suspect.Endpoint.Path.Name (peers |> Set.map (fun p -> p.Endpoint.Path.Name))
match pick peers with
| Some other -> // ask someone else to make a ping to confirm suspect is dead or alive
other.Endpoint <! PingReq(suspect, state.Active, ctx.Self)
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout suspect)
return! ready { state with Suspects = Map.add suspect (cancel, WaitPingAck(Set.add other skipList)) state.Suspects }
| None -> return! ready (leave suspect state) // no one left to ask, mark suspect as dead
| _ -> return Unhandled
| Join peer ->
logDebugf ctx "Peer '%s' requests to join the cluster." peer.Endpoint.Path.Name
let gossip = Set.add peer state.Active
let msg = Joined gossip
gossip
|> Set.remove state.Myself
|> Set.iter (fun peer -> peer.Endpoint <! msg)
return! ready { state with Active = gossip }
| Joined gossip ->
return! ready (merge gossip state)
| Left peer ->
logDebugf ctx "Peer '%s' left the cluster." peer.Endpoint.Path.Name
match Map.tryFind peer state.Suspects with
| None -> return! ready { state with Active = Set.remove peer state.Active }
| Some (cancel, _) ->
cancel.Cancel()
return! ready { state with Active = Set.remove peer state.Active; Suspects = Map.remove peer state.Suspects }
| _ -> return Unhandled
}
let becomeReady state gossip =
logInfof ctx "%A successfully joined SWIM cluster: %A" state.Myself.Endpoint.Path.Name (gossip |> Set.map (fun p -> p.Endpoint.Path.Name))
ctx.Schedule pingInterval ctx.Self NextRound |> ignore
ready (merge gossip state)
let rec joining state cancel = actor {
match! ctx.Receive() with
| JoinTimeout [] ->
logErrorf ctx "Failed to join any of the members. Shutting down"
return Stop
| JoinTimeout (next::remaining) ->
logDebugf ctx "Failed to join to a member. Try to join %O" next.Path.Name
next <! Join state.Myself
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining)
return! joining state cancel
| Joined gossip ->
cancel.Cancel() // cancel join timeout
return! becomeReady state gossip
| Join peer when peer = state.Myself -> // establish new cluster
cancel.Cancel() // cancel join timeout
return! becomeReady state (Set.singleton state.Myself)
| _ -> return Unhandled
}
match seeds with
| [] -> becomeReady state (Set.singleton state.Myself)
| seed::remaining ->
seed <! Join state.Myself
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining)
joining state cancel
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.