Toy implementation of SWIM protocol in Akkling (Akka.NET F#)
open System | |
open System.Threading | |
open Akkling | |
open DemoFs | |
[<EntryPoint>] | |
let main argv = | |
let config = """ | |
akka.loglevel = DEBUG | |
""" | |
use sys = System.create "swim-cluster" <| Configuration.parse config | |
let a = spawn sys "node-a" <| props (Swim.membership []) | |
Thread.Sleep(1000) | |
let b = spawn sys "node-b" <| props (Swim.membership [a]) | |
Thread.Sleep(1000) | |
let c = spawn sys "node-c" <| props (Swim.membership [a]) | |
Thread.Sleep(30_000) | |
sys.Stop (untyped b) // stop node B after 30 sec | |
Console.ReadLine() |> ignore | |
0 // return an integer exit code |
module DemoFs.Swim | |
open Akka.Actor | |
open Akka.Util | |
open System | |
open Akkling | |
type Endpoint = IActorRef<SwimProtocol> | |
and SwimProtocol = | |
/// Triggers next ping round on current member. | |
| NextRound | |
/// Ping message send from `replyTo` to another member with `status` piggybacked on. | |
| Ping of status:GossipStatus * replyTo:Endpoint | |
/// If `Ping` or `PingReq` where not replied on time, `PingTimeout` will happen. | |
| PingTimeout of suspect:Member | |
/// Reply to `Ping` request. | |
| PingAck of who:Member * status:GossipStatus | |
/// Indirect ping request my by member, who issued `Ping` but didn't received `PingAck` before timeout. | |
| PingReq of suspect:Member * status:GossipStatus * replyTo:Endpoint | |
/// Message broadcast among all nodes once suspected member didn't respond on `Ping` request on time. | |
| Suspected of Member | |
/// Message that overrides `Suspected` and marks suspected node as alive. | |
| Alive of Member | |
/// Message that confirms that node is dead. It overrides suspected, can be issued manually for | |
/// graceful cluster leave. | |
| Left of Member | |
/// Request issued by node trying to join the cluster, should be responded with `Joined` broadcast | |
/// among all cluster members (including new node). | |
| Join of Member | |
/// If `Join` request was not responded on time, a timeout is issued and next node from contact list | |
/// will receive join request. | |
| JoinTimeout of remaining:Endpoint list | |
/// Positive response for `Join` request broadcast among all of the nodes. | |
| Joined of GossipStatus | |
and Member = { Endpoint: Endpoint } | |
and GossipStatus = Set<Member> | |
type Message<'M> = Endpoint * 'M | |
[<Struct>] | |
type SuspectType = | |
/// On actor which issued original `Ping`. | |
| WaitPingAck of Set<Member> | |
/// On actor which issues `Ping` on behalf of another actor who sent it `PingReq` | |
| WaitPingReqAck | |
/// On actors who received `Suspected` notice. | |
| WaitConfirm | |
type SuspectState = ICancelable * SuspectType | |
type MembershipState = | |
{ Myself: Member | |
Active: Set<Member> | |
Suspects: Map<Member, SuspectState> } | |
static member Create (endpoint: Endpoint) = | |
let myself = { Endpoint = endpoint } | |
{ Myself = myself | |
Active = Set.empty | |
Suspects = Map.empty } | |
let pingTimeout = TimeSpan.FromSeconds 10. | |
let indirectPingTimeout = TimeSpan.FromSeconds 5. | |
let pingInterval = TimeSpan.FromSeconds 30. | |
let joinTimeout = TimeSpan.FromSeconds 30. | |
let suspicionTimeout = TimeSpan.FromSeconds 30. | |
let membership (seeds: Endpoint list) (ctx: Actor<_>) = | |
let state = MembershipState.Create ctx.Self | |
let pick peers = | |
match Set.count peers with | |
| 0 -> None | |
| count -> | |
let idx = ThreadLocalRandom.Current.Next count | |
Some (Seq.item idx peers) | |
let leave peer state = | |
let active = Set.remove peer state.Active | |
let left = Left peer | |
active | |
|> Set.remove state.Myself | |
|> Set.iter (fun peer -> peer.Endpoint <! left) | |
{ state with Suspects = Map.remove peer state.Suspects; Active = active } | |
let merge gossip state = | |
let added = gossip - state.Active |> Set.map (fun peer -> peer.Endpoint.Path.Name) | |
let removed = state.Active - gossip |> Set.map (fun peer -> peer.Endpoint.Path.Name) | |
if not (Set.isEmpty added && Set.isEmpty removed) then | |
logDebugf ctx "Received gossip - added: %O, removed: %O" added removed | |
{ state with Active = state.Active + gossip } | |
let rec ready state = actor { | |
match! ctx.Receive() with | |
| NextRound -> | |
ctx.Schedule pingInterval ctx.Self NextRound |> ignore | |
let suspects = state.Suspects |> Map.toSeq |> Seq.map fst |> Set.ofSeq | |
let others = (Set.remove state.Myself state.Active) - suspects | |
// pick one member at random, other than self and not marked as suspected | |
match pick others with | |
| None -> return! ready state | |
| Some peer -> | |
peer.Endpoint <! Ping(state.Active, ctx.Self) | |
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout peer) | |
let skipList = Set.ofArray [| state.Myself; peer |] | |
return! ready { state with Suspects = Map.add peer (cancel, WaitPingAck skipList) state.Suspects } | |
| Ping (gossip, sender) -> | |
logDebugf ctx "Received PING from '%s'. Sending ACK." sender.Path.Name | |
sender <! PingAck(state.Myself, state.Active) | |
return! ready (merge gossip state) | |
| PingAck (who, gossip) -> | |
logDebugf ctx "Received ACK from '%s'." who.Endpoint.Path.Name | |
let newState = | |
match Map.tryFind who state.Suspects with | |
| None -> state | |
| Some (cancel, status) -> | |
cancel.Cancel() | |
let newState = { merge gossip state with Suspects = Map.remove who state.Suspects } | |
match status with | |
| WaitPingReqAck -> | |
// notify everyone that actor is alive and no longer suspected | |
let msg = Alive who | |
Set.remove newState.Myself newState.Active | |
|> Set.iter (fun peer -> peer.Endpoint <! msg) | |
| _ -> () | |
newState | |
return! ready (merge gossip newState) | |
| PingReq (suspect, gossip, sender) -> | |
logDebugf ctx "Received PING REQ: '%s' -> '%s'." sender.Path.Name suspect.Endpoint.Path.Name | |
let cancel = ctx.Schedule indirectPingTimeout ctx.Self (PingTimeout suspect) | |
suspect.Endpoint <! Ping(state.Active, ctx.Self) | |
return! ready { merge gossip state with Suspects = Map.add suspect (cancel, WaitPingReqAck) state.Suspects } | |
| Suspected suspect when suspect = state.Myself -> | |
(Set.remove state.Myself state.Active) | |
|> Set.iter (fun peer -> peer.Endpoint <! Alive state.Myself) | |
return! ready state | |
| Suspected suspect -> | |
let cancel = ctx.Schedule suspicionTimeout ctx.Self (Alive suspect) | |
match Map.tryFind suspect state.Suspects with | |
| None -> return! ready { state with Suspects = Map.add suspect (cancel, WaitConfirm) state.Suspects } | |
| _ -> return! ready state | |
| Alive peer -> | |
let newState = | |
match Map.tryFind peer state.Suspects with | |
| None -> state | |
| Some (cancel, _) -> | |
cancel.Cancel() | |
{ state with Suspects = Map.remove peer state.Suspects } | |
return! ready newState | |
| PingTimeout suspect -> | |
match Map.tryFind suspect state.Suspects with | |
| None -> return! ready state // duplicate or invalidated | |
| Some (_, WaitPingReqAck) -> | |
logDebugf ctx "REQ ACK timeout. '%s' confirmed dead." suspect.Endpoint.Path.Name | |
return! ready (leave suspect state) | |
| Some (_, WaitPingAck skipList) -> | |
let peers = state.Active - skipList | |
logDebugf ctx "ACK timeout from '%s'. Picking from: %O" suspect.Endpoint.Path.Name (peers |> Set.map (fun p -> p.Endpoint.Path.Name)) | |
match pick peers with | |
| Some other -> // ask someone else to make a ping to confirm suspect is dead or alive | |
other.Endpoint <! PingReq(suspect, state.Active, ctx.Self) | |
let cancel = ctx.Schedule pingTimeout ctx.Self (PingTimeout suspect) | |
return! ready { state with Suspects = Map.add suspect (cancel, WaitPingAck(Set.add other skipList)) state.Suspects } | |
| None -> return! ready (leave suspect state) // no one left to ask, mark suspect as dead | |
| _ -> return Unhandled | |
| Join peer -> | |
logDebugf ctx "Peer '%s' requests to join the cluster." peer.Endpoint.Path.Name | |
let gossip = Set.add peer state.Active | |
let msg = Joined gossip | |
gossip | |
|> Set.remove state.Myself | |
|> Set.iter (fun peer -> peer.Endpoint <! msg) | |
return! ready { state with Active = gossip } | |
| Joined gossip -> | |
return! ready (merge gossip state) | |
| Left peer -> | |
logDebugf ctx "Peer '%s' left the cluster." peer.Endpoint.Path.Name | |
match Map.tryFind peer state.Suspects with | |
| None -> return! ready { state with Active = Set.remove peer state.Active } | |
| Some (cancel, _) -> | |
cancel.Cancel() | |
return! ready { state with Active = Set.remove peer state.Active; Suspects = Map.remove peer state.Suspects } | |
| _ -> return Unhandled | |
} | |
let becomeReady state gossip = | |
logInfof ctx "%A successfully joined SWIM cluster: %A" state.Myself.Endpoint.Path.Name (gossip |> Set.map (fun p -> p.Endpoint.Path.Name)) | |
ctx.Schedule pingInterval ctx.Self NextRound |> ignore | |
ready (merge gossip state) | |
let rec joining state cancel = actor { | |
match! ctx.Receive() with | |
| JoinTimeout [] -> | |
logErrorf ctx "Failed to join any of the members. Shutting down" | |
return Stop | |
| JoinTimeout (next::remaining) -> | |
logDebugf ctx "Failed to join to a member. Try to join %O" next.Path.Name | |
next <! Join state.Myself | |
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining) | |
return! joining state cancel | |
| Joined gossip -> | |
cancel.Cancel() // cancel join timeout | |
return! becomeReady state gossip | |
| Join peer when peer = state.Myself -> // establish new cluster | |
cancel.Cancel() // cancel join timeout | |
return! becomeReady state (Set.singleton state.Myself) | |
| _ -> return Unhandled | |
} | |
match seeds with | |
| [] -> becomeReady state (Set.singleton state.Myself) | |
| seed::remaining -> | |
seed <! Join state.Myself | |
let cancel = ctx.Schedule joinTimeout ctx.Self (JoinTimeout remaining) | |
joining state cancel |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment