HyParView: a membership protocol for reliable gossip-based broadcast
// Reference: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf | |
module Protocols.Hyparview | |
open System | |
open System.Runtime.ExceptionServices | |
open System.Threading | |
type Endpoint = string | |
type TTL = int | |
type Config = | |
{ LocalEndpoint: Endpoint // identifier of current node | |
ActiveRandomWalkLength: int // init TTL for `Neighbor` message | |
PassiveRandomWalkLength: int // TTL threshold after which Neighbor may result in passive state addition | |
ActiveViewCapacity: int // max allowed size of active view | |
PassiveViewCapacity: int // max allowed size of passive view | |
ShuffleTTL: int // init TTL for `Shuffle` message | |
ShuffleActiveViewCount: int // no. of active peers included in `Shuffle` message | |
ShufflePassiveViewCount: int // no. of passive peers included in `Shuffle` message | |
ShuffleInterval: TimeSpan } // time interval in which shuffling procedure is started | |
static member Create(endpoint) = { Config.Default with LocalEndpoint = endpoint } | |
static member Default = | |
{ LocalEndpoint = "localhost:5000" | |
ActiveRandomWalkLength = 5 | |
PassiveRandomWalkLength = 2 | |
ActiveViewCapacity = 4 | |
PassiveViewCapacity = 24 | |
ShuffleTTL = 2 | |
ShuffleActiveViewCount = 2 | |
ShufflePassiveViewCount = 2 | |
ShuffleInterval = TimeSpan.FromSeconds 60. } | |
type Shuffle = | |
{ Origin: Endpoint // the original initiator of Shuffle request | |
Nodes: Set<Endpoint> // shuffled nodes that being exchanged | |
Ttl: TTL } // remaining time to live | |
type Message = | |
/// Initial request to join to given cluster. | |
| Join | |
/// Upon Join, ForwardJoin is gossiped over the cluster to introduce `peer` to other cluster members. | |
| ForwardJoin of peer:Endpoint * ttl:TTL | |
/// Message that triggers shuffle procedure. | |
| DoShuffle | |
/// Shuffle request, that is used to refresh and exchange passive view with other nodes. | |
| Shuffle of Shuffle | |
/// Shuffle reply, that contains exchanged nodes. | |
| ShuffleReply of Set<Endpoint> | |
/// Request to add sender to an active view of recipient. If `highPriority` is set, it cannot be denied. | |
| Neighbor of highPriority:bool | |
/// Disconnect request. If `alive` is set, sender can safely be added to passive set for future reconnections. | |
/// If `response` is set, recipient should answer with its own `Disconnect` (with respond=false) as well. | |
| Disconnect of alive:bool * respond:bool | |
/// Determines if this message type can be used to initialize connection. Such messages needs to be checked | |
/// if sender is in active set, and disconnect from it otherwise. | |
member this.CanInit = | |
match this with | |
| Disconnect _ | DoShuffle -> false | |
| _ -> true | |
[<Struct>] | |
type Envelope = { Peer: Endpoint; Message: Message } | |
type PeerEvent = | |
| NeighborUp of Endpoint // Notifies that new peer has been added to active set. | |
| NeighborDown of Endpoint // Notifies that existing peer has stepped down from active set. | |
[<Interface>] | |
type Reactor = | |
/// Sends a `message` to a `target` node. If current and `target` nodes where not connected so far, | |
/// it will establish connection first. | |
abstract Send: target:Endpoint * envelope:Envelope -> Async<unit> | |
/// Disconnects from previously connected node (connection happens on initial `this.Send` request). | |
abstract Disconnect: Endpoint -> Async<unit> | |
/// Sends a peer event notification. | |
abstract Notify: PeerEvent -> unit | |
type PeerState = | |
{ ActiveView: Set<Endpoint> | |
PassiveView: Set<Endpoint> | |
Config: Config | |
Random: Random | |
Output: Reactor } | |
static member Create (config: Config) (out: Reactor) (random: Random) = | |
{ ActiveView = Set.empty | |
PassiveView = Set.empty | |
Random = random | |
Config = config | |
Output = out } | |
member this.Self = this.Config.LocalEndpoint | |
member this.Send(target, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg }) | |
type Random with | |
member this.Pick(set: Set<'a>): 'a option = | |
if Set.isEmpty set then None | |
else | |
let i = this.Next(set.Count) | |
set |> Seq.skip i |> Seq.tryHead | |
member this.Shuffle(set: Set<'a>): 'a seq = | |
set |> Seq.sortBy (fun _ -> this.Next()) | |
let private isPassiveViewFull (state: PeerState) = | |
state.PassiveView.Count >= state.Config.PassiveViewCapacity | |
let private addPassive (peer: Endpoint) (state: PeerState) = | |
if Set.contains peer state.ActiveView || Set.contains peer state.PassiveView || peer = state.Self | |
then state | |
else | |
let passive = | |
if isPassiveViewFull state then | |
state.Random.Pick state.PassiveView | |
|> Option.map (fun drop -> Set.remove drop state.PassiveView) | |
|> Option.defaultValue state.PassiveView | |
else state.PassiveView | |
{ state with PassiveView = Set.add peer passive } | |
let private isActiveViewFull (state: PeerState) = | |
state.ActiveView.Count >= state.Config.ActiveViewCapacity | |
let private removeActive (peer: Endpoint) (state: PeerState) respond = async { | |
let active = Set.remove peer state.ActiveView | |
if obj.ReferenceEquals(active, state.ActiveView) then return state | |
else | |
if respond then | |
do! state.Send(peer, Disconnect(alive=true, respond=false)) | |
do! state.Output.Disconnect(peer) | |
state.Output.Notify(NeighborDown peer) | |
return addPassive peer { state with ActiveView = active } | |
} | |
let private removeActiveIfFull state = async { | |
if isActiveViewFull state then | |
match state.Random.Pick state.ActiveView with | |
| Some drop -> return! removeActive drop state true | |
| None -> return state | |
else return state | |
} | |
let private addActive (peer: Endpoint) (highPriority: bool) (state: PeerState) = async { | |
if Set.contains peer state.ActiveView || peer = state.Self then return state | |
else | |
let! state = removeActiveIfFull state | |
let passive = Set.remove peer state.PassiveView | |
let active = Set.add peer state.ActiveView | |
do! state.Send(peer, Neighbor(highPriority)) | |
state.Output.Notify(NeighborUp peer) | |
return { state with ActiveView = active; PassiveView = passive } | |
} | |
let private onDisconnect (old: PeerState) (peer: Endpoint) alive respond = async { | |
let! state = removeActive peer old respond | |
if not (obj.ReferenceEquals(old, state)) then | |
let passive = Set.remove peer state.PassiveView | |
if not (isActiveViewFull state) then | |
match state.Random.Pick passive with | |
| Some node -> | |
let highPriority = Set.isEmpty state.ActiveView | |
do! state.Send(node, Neighbor(highPriority)) | |
return if alive then addPassive peer state else state | |
| None -> return if alive then addPassive peer state else state | |
else | |
return if alive then addPassive peer state else state | |
else return state | |
} | |
let private doShuffle (state: PeerState) = async { | |
match state.Random.Pick state.ActiveView with | |
| None -> return state | |
| Some node -> | |
let active = | |
state.Random.Shuffle (Set.remove node state.ActiveView) | |
|> Seq.take state.Config.ShuffleActiveViewCount | |
|> Set.ofSeq | |
let passive = | |
state.Random.Shuffle state.PassiveView | |
|> Seq.take state.Config.ShufflePassiveViewCount | |
|> Set.ofSeq | |
let msg = | |
{ Origin = state.Self | |
Nodes = active + passive | |
Ttl = state.Config.ShuffleTTL } | |
do! state.Send(node, Shuffle msg) | |
return state | |
} | |
let private onNeighbor (state: PeerState) (peer: Endpoint) highPriority = async { | |
if highPriority || not (isActiveViewFull state) then | |
return! addActive peer highPriority state | |
else return state | |
} | |
let private onJoin (state: PeerState) (peer: Endpoint) = async { | |
let! state = addActive peer true state | |
let ttl = state.Config.ActiveRandomWalkLength | |
let fwd = ForwardJoin(peer, ttl) | |
for node in Set.remove peer state.ActiveView do | |
do! state.Send(node, fwd) | |
return state | |
} | |
let private onForwardJoin (state: PeerState) (peer: Endpoint) (sender: Endpoint) ttl = async { | |
if ttl = 0 || Set.isEmpty state.ActiveView then return! addActive peer true state | |
else | |
let state = if ttl = state.Config.PassiveRandomWalkLength then addPassive peer state else state | |
match state.Random.Pick(Set.remove sender state.ActiveView) with | |
| None -> return! addActive peer true state | |
| Some next -> | |
do! state.Send(next, ForwardJoin(peer, ttl-1)) | |
return state | |
} | |
let private onShuffle (state: PeerState) (shuffle: Shuffle) sender = async { | |
if shuffle.Ttl = 0 then | |
let nodes = | |
state.Random.Shuffle(state.PassiveView) | |
|> Seq.take shuffle.Nodes.Count | |
|> Set.ofSeq | |
do! state.Send(shuffle.Origin, ShuffleReply(nodes)) | |
return shuffle.Nodes |> Set.fold (fun acc node -> addPassive node acc) state | |
else match state.Random.Pick(state.ActiveView - Set.ofList [shuffle.Origin; sender]) with | |
| Some node -> | |
do! state.Send(node, Shuffle { shuffle with Ttl = shuffle.Ttl - 1 }) | |
return state | |
| None -> return state | |
} | |
let private onShuffleReply (state: PeerState) (nodes: Set<Endpoint>) = async { | |
return nodes |> Set.fold (fun acc peer -> addPassive peer acc) state | |
} | |
/// Disconnect peer if its not in active set. | |
let private disconnectNonActivePeer (state: PeerState) (peer: Endpoint) = async { | |
if peer <> state.Self && not (Set.contains peer state.ActiveView) then | |
do! state.Send(peer, Disconnect(alive=true, respond=false)) | |
do! state.Output.Disconnect(peer) | |
} | |
let handle (state: PeerState) (e: Envelope) : Async<PeerState> = async { | |
let! state = | |
match e.Message with | |
| Join -> onJoin state e.Peer | |
| ForwardJoin(peer, ttl) -> onForwardJoin state peer e.Peer ttl | |
| Shuffle shuffle -> onShuffle state shuffle e.Peer | |
| ShuffleReply nodes -> onShuffleReply state nodes | |
| Neighbor highPriority -> onNeighbor state e.Peer highPriority | |
| Disconnect(alive, respond) -> onDisconnect state e.Peer alive respond | |
| DoShuffle -> doShuffle state | |
// make sure to dispose the connection from sender if it's not a part of active view | |
if e.Message.CanInit then | |
do! disconnectNonActivePeer state e.Peer | |
return state | |
} | |
let private actor (state: PeerState) = | |
let mailbox = MailboxProcessor.Start(fun inbox -> | |
let rec loop state = async { | |
let! (msg, ch: AsyncReplyChannel<exn option> option) = inbox.Receive() | |
try | |
let! state = handle state msg | |
ch |> Option.iter (fun c -> c.Reply None) | |
return! loop state | |
with | |
| e -> | |
ch |> Option.iter (fun c -> c.Reply (Some e)) | |
return! loop state | |
} | |
loop state) | |
mailbox | |
type Membership(out: Reactor, ?config: Config, ?random: Random) = | |
let config = config |> Option.defaultValue Config.Default | |
let random = random |> Option.defaultWith Random | |
let state = PeerState.Create config out random | |
let mailbox = actor state | |
let shuffle = | |
let interval = int config.ShuffleInterval.TotalMilliseconds | |
new Timer(TimerCallback(fun _ -> | |
mailbox.Post ({ Peer = config.LocalEndpoint; Message = DoShuffle }, None)), null, interval, interval) | |
member this.Join(endpoint: Endpoint) = async { | |
match! mailbox.PostAndAsyncReply(fun ch -> ({ Peer = endpoint; Message = Join }, Some ch)) with | |
| None -> () | |
| Some e -> ExceptionDispatchInfo.Capture(e).Throw() | |
} | |
member this.Disconnect(endpoint: Endpoint) = async { | |
match! mailbox.PostAndAsyncReply(fun ch -> ({ Peer = endpoint; Message = Disconnect(true, true) }, Some ch)) with | |
| None -> () | |
| Some e -> ExceptionDispatchInfo.Capture(e).Throw() | |
} | |
member this.Endpoint = config.LocalEndpoint | |
member this.Post (e: Envelope) = mailbox.Post ((e, None)) | |
interface IDisposable with | |
member this.Dispose() = | |
shuffle.Dispose() | |
(mailbox :> IDisposable).Dispose() |
module HyparviewTests.HyparviewTests | |
open System | |
open Expecto | |
open Protocols.Hyparview | |
open System.Threading | |
open System.Threading.Channels | |
open FSharp.Control.Tasks | |
[<Sealed>] | |
type TestEnv() = | |
let sync = obj() | |
let mutable peers = Map.empty | |
let mutable connections = Set.empty | |
let peerEvents = Channel.CreateUnbounded<_>() | |
let connect endpoint target = | |
lock sync (fun () -> | |
//printfn "Connecting %s to %s" endpoint target | |
connections <- Set.add (endpoint, target) connections | |
connections <- Set.add (target, endpoint) connections | |
) | |
member this.Register(endpoint, peer) = peers <- Map.add endpoint peer peers | |
member this.AreConnected(a, b) = | |
lock sync ( fun () -> Set.contains (a, b) connections || Set.contains (b, a) connections) | |
member this.Events = peerEvents.Reader | |
member this.Reactor (endpoint: Endpoint) = | |
{ new Reactor with | |
member this.Send(target, msg) = async { | |
match Map.tryFind target peers with | |
| Some (m: Membership) -> | |
if not (Set.contains (endpoint, target) connections) then | |
connect endpoint target | |
m.Post msg | |
//printfn "%s sent to %s: %O" endpoint target msg | |
| None -> failwithf "Couldn't find target node: %O" target | |
} | |
member this.Disconnect(target) = async { | |
lock sync (fun () -> | |
//printfn "Disconnecting %s from %s" endpoint target | |
connections <- Set.remove (endpoint, target) connections | |
connections <- Set.remove (target, endpoint) connections | |
) | |
} | |
member this.Notify event = | |
peerEvents.Writer.WriteAsync((endpoint, event)).GetAwaiter().GetResult() } | |
let private membership (env: TestEnv) (config: Config) (random: Random) = | |
let reactor = env.Reactor config.LocalEndpoint | |
let m = new Membership(reactor, config, random) | |
env.Register(config.LocalEndpoint, m) | |
m | |
let private poll count (env: TestEnv) (ct: CancellationToken) = task { | |
let messages = Array.zeroCreate count | |
for i=0 to count-1 do | |
let! msg = env.Events.ReadAsync ct | |
messages.[i] <- msg | |
return messages | |
} | |
[<RequireQualifiedAccess>] | |
module Expect = | |
let connected (env: TestEnv) connections = | |
for (x, y) in connections do | |
Expect.isTrue (env.AreConnected(x, y)) <| sprintf "%s-%s should be connected" x y | |
let disconnected (env: TestEnv) connections = | |
for (x, y) in connections do | |
Expect.isFalse (env.AreConnected(x, y)) <| sprintf "%s-%s should be diconnected" x y | |
[<Tests>] | |
let tests = testList "Hyparview" [ | |
testTask "join & leave" { | |
do! task { | |
let env = TestEnv() | |
use a = membership env (Config.Create "A") (Random()) | |
use b = membership env (Config.Create "B") (Random()) | |
use cancel = new CancellationTokenSource(1000) | |
do! a.Join "B" | |
let! actual = poll 2 env cancel.Token | |
let expected = Set.ofList [("A", NeighborUp "B"); ("B", NeighborUp "A")] | |
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by both joining nodes" | |
Expect.isTrue (env.AreConnected("A", "B")) "both parties should be connected" | |
do! b.Disconnect "A" | |
let! actual = poll 2 env cancel.Token | |
let expected = Set.ofList [("A", NeighborDown "B"); ("B", NeighborDown "A")] | |
Expect.equal (Set.ofArray actual) expected "neighbor down should be notified by both disconnecting nodes" | |
Expect.disconnected env [("A", "B")] | |
} | |
} | |
testTask "limit active view size" { | |
do! task { | |
let env = TestEnv() | |
use a = membership env ({ Config.Create "A" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use b = membership env ({ Config.Create "B" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use c = membership env ({ Config.Create "C" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use d = membership env ({ Config.Create "D" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use cancel = new CancellationTokenSource(1000) | |
do! a.Join "B" | |
do! a.Join "C" | |
do! b.Join "C" | |
// A-B-C are interconnected to each other (active view cap = 2) | |
let expected = Set.ofList [ | |
("A", NeighborUp "B") | |
("A", NeighborUp "C") | |
("B", NeighborUp "C") | |
("B", NeighborUp "A") | |
("C", NeighborUp "B") | |
("C", NeighborUp "A") | |
] | |
let! actual = poll 6 env cancel.Token | |
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by all joining nodes" | |
Expect.connected env [("A", "B"); ("A", "C") ; ("B", "C")] | |
do! d.Join "A" // since D joins A and has no other neighbors, it has higher priority. A must disconnect from either B or C. | |
let expected = Set.ofList [ | |
("D", NeighborUp "A") | |
("A", NeighborUp "D") | |
("A", NeighborDown "B") | |
("B", NeighborDown "A") | |
] | |
let! actual = poll 4 env cancel.Token | |
Expect.equal (Set.ofArray actual) expected "D should join with A, which should disconnect from B or C" | |
Expect.connected env [("A", "D"); ("A", "C") ; ("B", "C")] | |
Expect.disconnected env [("A", "B"); ("D", "B") ; ("D", "C")] | |
} | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment