Last active
August 18, 2021 10:29
-
-
Save Horusiath/fe6b124c0bf38291bc5c87a754db0681 to your computer and use it in GitHub Desktop.
HyParView: a membership protocol for reliable gossip-based broadcast
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Reference: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf | |
module Protocols.Hyparview | |
open System | |
open System.Runtime.ExceptionServices | |
open System.Threading | |
type Endpoint = string | |
type TTL = int | |
type Config = | |
{ LocalEndpoint: Endpoint // identifier of current node | |
ActiveRandomWalkLength: int // init TTL for `Neighbor` message | |
PassiveRandomWalkLength: int // TTL threshold after which Neighbor may result in passive state addition | |
ActiveViewCapacity: int // max allowed size of active view | |
PassiveViewCapacity: int // max allowed size of passive view | |
ShuffleTTL: int // init TTL for `Shuffle` message | |
ShuffleActiveViewCount: int // no. of active peers included in `Shuffle` message | |
ShufflePassiveViewCount: int // no. of passive peers included in `Shuffle` message | |
ShuffleInterval: TimeSpan } // time interval in which shuffling procedure is started | |
static member Create(endpoint) = { Config.Default with LocalEndpoint = endpoint } | |
static member Default = | |
{ LocalEndpoint = "localhost:5000" | |
ActiveRandomWalkLength = 5 | |
PassiveRandomWalkLength = 2 | |
ActiveViewCapacity = 4 | |
PassiveViewCapacity = 24 | |
ShuffleTTL = 2 | |
ShuffleActiveViewCount = 2 | |
ShufflePassiveViewCount = 2 | |
ShuffleInterval = TimeSpan.FromSeconds 60. } | |
type Shuffle = | |
{ Origin: Endpoint // the original initiator of Shuffle request | |
Nodes: Set<Endpoint> // shuffled nodes that being exchanged | |
Ttl: TTL } // remaining time to live | |
type Message = | |
/// Initial request to join to given cluster. | |
| Join | |
/// Upon Join, ForwardJoin is gossiped over the cluster to introduce `peer` to other cluster members. | |
| ForwardJoin of peer:Endpoint * ttl:TTL | |
/// Message that triggers shuffle procedure. | |
| DoShuffle | |
/// Shuffle request, that is used to refresh and exchange passive view with other nodes. | |
| Shuffle of Shuffle | |
/// Shuffle reply, that contains exchanged nodes. | |
| ShuffleReply of Set<Endpoint> | |
/// Request to add sender to an active view of recipient. If `highPriority` is set, it cannot be denied. | |
| Neighbor of highPriority:bool | |
/// Disconnect request. If `alive` is set, sender can safely be added to passive set for future reconnections. | |
/// If `response` is set, recipient should answer with its own `Disconnect` (with respond=false) as well. | |
| Disconnect of alive:bool * respond:bool | |
/// Determines if this message type can be used to initialize connection. Such messages needs to be checked | |
/// if sender is in active set, and disconnect from it otherwise. | |
member this.CanInit = | |
match this with | |
| Disconnect _ | DoShuffle -> false | |
| _ -> true | |
[<Struct>] | |
type Envelope = { Peer: Endpoint; Message: Message } | |
type PeerEvent = | |
| NeighborUp of Endpoint // Notifies that new peer has been added to active set. | |
| NeighborDown of Endpoint // Notifies that existing peer has stepped down from active set. | |
[<Interface>] | |
type Reactor = | |
/// Sends a `message` to a `target` node. If current and `target` nodes where not connected so far, | |
/// it will establish connection first. | |
abstract Send: target:Endpoint * envelope:Envelope -> Async<unit> | |
/// Disconnects from previously connected node (connection happens on initial `this.Send` request). | |
abstract Disconnect: Endpoint -> Async<unit> | |
/// Sends a peer event notification. | |
abstract Notify: PeerEvent -> unit | |
type PeerState = | |
{ ActiveView: Set<Endpoint> | |
PassiveView: Set<Endpoint> | |
Config: Config | |
Random: Random | |
Output: Reactor } | |
static member Create (config: Config) (out: Reactor) (random: Random) = | |
{ ActiveView = Set.empty | |
PassiveView = Set.empty | |
Random = random | |
Config = config | |
Output = out } | |
member this.Self = this.Config.LocalEndpoint | |
member this.Send(target, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg }) | |
type Random with | |
member this.Pick(set: Set<'a>): 'a option = | |
if Set.isEmpty set then None | |
else | |
let i = this.Next(set.Count) | |
set |> Seq.skip i |> Seq.tryHead | |
member this.Shuffle(set: Set<'a>): 'a seq = | |
set |> Seq.sortBy (fun _ -> this.Next()) | |
let private isPassiveViewFull (state: PeerState) = | |
state.PassiveView.Count >= state.Config.PassiveViewCapacity | |
let private addPassive (peer: Endpoint) (state: PeerState) = | |
if Set.contains peer state.ActiveView || Set.contains peer state.PassiveView || peer = state.Self | |
then state | |
else | |
let passive = | |
if isPassiveViewFull state then | |
state.Random.Pick state.PassiveView | |
|> Option.map (fun drop -> Set.remove drop state.PassiveView) | |
|> Option.defaultValue state.PassiveView | |
else state.PassiveView | |
{ state with PassiveView = Set.add peer passive } | |
let private isActiveViewFull (state: PeerState) = | |
state.ActiveView.Count >= state.Config.ActiveViewCapacity | |
let private removeActive (peer: Endpoint) (state: PeerState) respond = async { | |
let active = Set.remove peer state.ActiveView | |
if obj.ReferenceEquals(active, state.ActiveView) then return state | |
else | |
if respond then | |
do! state.Send(peer, Disconnect(alive=true, respond=false)) | |
do! state.Output.Disconnect(peer) | |
state.Output.Notify(NeighborDown peer) | |
return addPassive peer { state with ActiveView = active } | |
} | |
let private removeActiveIfFull state = async { | |
if isActiveViewFull state then | |
match state.Random.Pick state.ActiveView with | |
| Some drop -> return! removeActive drop state true | |
| None -> return state | |
else return state | |
} | |
let private addActive (peer: Endpoint) (highPriority: bool) (state: PeerState) = async { | |
if Set.contains peer state.ActiveView || peer = state.Self then return state | |
else | |
let! state = removeActiveIfFull state | |
let passive = Set.remove peer state.PassiveView | |
let active = Set.add peer state.ActiveView | |
do! state.Send(peer, Neighbor(highPriority)) | |
state.Output.Notify(NeighborUp peer) | |
return { state with ActiveView = active; PassiveView = passive } | |
} | |
let private onDisconnect (old: PeerState) (peer: Endpoint) alive respond = async { | |
let! state = removeActive peer old respond | |
if not (obj.ReferenceEquals(old, state)) then | |
let passive = Set.remove peer state.PassiveView | |
if not (isActiveViewFull state) then | |
match state.Random.Pick passive with | |
| Some node -> | |
let highPriority = Set.isEmpty state.ActiveView | |
do! state.Send(node, Neighbor(highPriority)) | |
return if alive then addPassive peer state else state | |
| None -> return if alive then addPassive peer state else state | |
else | |
return if alive then addPassive peer state else state | |
else return state | |
} | |
let private doShuffle (state: PeerState) = async { | |
match state.Random.Pick state.ActiveView with | |
| None -> return state | |
| Some node -> | |
let active = | |
state.Random.Shuffle (Set.remove node state.ActiveView) | |
|> Seq.take state.Config.ShuffleActiveViewCount | |
|> Set.ofSeq | |
let passive = | |
state.Random.Shuffle state.PassiveView | |
|> Seq.take state.Config.ShufflePassiveViewCount | |
|> Set.ofSeq | |
let msg = | |
{ Origin = state.Self | |
Nodes = active + passive | |
Ttl = state.Config.ShuffleTTL } | |
do! state.Send(node, Shuffle msg) | |
return state | |
} | |
let private onNeighbor (state: PeerState) (peer: Endpoint) highPriority = async { | |
if highPriority || not (isActiveViewFull state) then | |
return! addActive peer highPriority state | |
else return state | |
} | |
let private onJoin (state: PeerState) (peer: Endpoint) = async { | |
let! state = addActive peer true state | |
let ttl = state.Config.ActiveRandomWalkLength | |
let fwd = ForwardJoin(peer, ttl) | |
for node in Set.remove peer state.ActiveView do | |
do! state.Send(node, fwd) | |
return state | |
} | |
let private onForwardJoin (state: PeerState) (peer: Endpoint) (sender: Endpoint) ttl = async { | |
if ttl = 0 || Set.isEmpty state.ActiveView then return! addActive peer true state | |
else | |
let state = if ttl = state.Config.PassiveRandomWalkLength then addPassive peer state else state | |
match state.Random.Pick(Set.remove sender state.ActiveView) with | |
| None -> return! addActive peer true state | |
| Some next -> | |
do! state.Send(next, ForwardJoin(peer, ttl-1)) | |
return state | |
} | |
let private onShuffle (state: PeerState) (shuffle: Shuffle) sender = async { | |
if shuffle.Ttl = 0 then | |
let nodes = | |
state.Random.Shuffle(state.PassiveView) | |
|> Seq.take shuffle.Nodes.Count | |
|> Set.ofSeq | |
do! state.Send(shuffle.Origin, ShuffleReply(nodes)) | |
return shuffle.Nodes |> Set.fold (fun acc node -> addPassive node acc) state | |
else match state.Random.Pick(state.ActiveView - Set.ofList [shuffle.Origin; sender]) with | |
| Some node -> | |
do! state.Send(node, Shuffle { shuffle with Ttl = shuffle.Ttl - 1 }) | |
return state | |
| None -> return state | |
} | |
let private onShuffleReply (state: PeerState) (nodes: Set<Endpoint>) = async { | |
return nodes |> Set.fold (fun acc peer -> addPassive peer acc) state | |
} | |
/// Disconnect peer if its not in active set. | |
let private disconnectNonActivePeer (state: PeerState) (peer: Endpoint) = async { | |
if peer <> state.Self && not (Set.contains peer state.ActiveView) then | |
do! state.Send(peer, Disconnect(alive=true, respond=false)) | |
do! state.Output.Disconnect(peer) | |
} | |
let handle (state: PeerState) (e: Envelope) : Async<PeerState> = async { | |
let! state = | |
match e.Message with | |
| Join -> onJoin state e.Peer | |
| ForwardJoin(peer, ttl) -> onForwardJoin state peer e.Peer ttl | |
| Shuffle shuffle -> onShuffle state shuffle e.Peer | |
| ShuffleReply nodes -> onShuffleReply state nodes | |
| Neighbor highPriority -> onNeighbor state e.Peer highPriority | |
| Disconnect(alive, respond) -> onDisconnect state e.Peer alive respond | |
| DoShuffle -> doShuffle state | |
// make sure to dispose the connection from sender if it's not a part of active view | |
if e.Message.CanInit then | |
do! disconnectNonActivePeer state e.Peer | |
return state | |
} | |
let private actor (state: PeerState) = | |
let mailbox = MailboxProcessor.Start(fun inbox -> | |
let rec loop state = async { | |
let! (msg, ch: AsyncReplyChannel<exn option> option) = inbox.Receive() | |
try | |
let! state = handle state msg | |
ch |> Option.iter (fun c -> c.Reply None) | |
return! loop state | |
with | |
| e -> | |
ch |> Option.iter (fun c -> c.Reply (Some e)) | |
return! loop state | |
} | |
loop state) | |
mailbox | |
type Membership(out: Reactor, ?config: Config, ?random: Random) = | |
let config = config |> Option.defaultValue Config.Default | |
let random = random |> Option.defaultWith Random | |
let state = PeerState.Create config out random | |
let mailbox = actor state | |
let shuffle = | |
let interval = int config.ShuffleInterval.TotalMilliseconds | |
new Timer(TimerCallback(fun _ -> | |
mailbox.Post ({ Peer = config.LocalEndpoint; Message = DoShuffle }, None)), null, interval, interval) | |
member this.Join(endpoint: Endpoint) = async { | |
match! mailbox.PostAndAsyncReply(fun ch -> ({ Peer = endpoint; Message = Join }, Some ch)) with | |
| None -> () | |
| Some e -> ExceptionDispatchInfo.Capture(e).Throw() | |
} | |
member this.Disconnect(endpoint: Endpoint) = async { | |
match! mailbox.PostAndAsyncReply(fun ch -> ({ Peer = endpoint; Message = Disconnect(true, true) }, Some ch)) with | |
| None -> () | |
| Some e -> ExceptionDispatchInfo.Capture(e).Throw() | |
} | |
member this.Endpoint = config.LocalEndpoint | |
member this.Post (e: Envelope) = mailbox.Post ((e, None)) | |
interface IDisposable with | |
member this.Dispose() = | |
shuffle.Dispose() | |
(mailbox :> IDisposable).Dispose() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module HyparviewTests.HyparviewTests | |
open System | |
open Expecto | |
open Protocols.Hyparview | |
open System.Threading | |
open System.Threading.Channels | |
open FSharp.Control.Tasks | |
[<Sealed>] | |
type TestEnv() = | |
let sync = obj() | |
let mutable peers = Map.empty | |
let mutable connections = Set.empty | |
let peerEvents = Channel.CreateUnbounded<_>() | |
let connect endpoint target = | |
lock sync (fun () -> | |
//printfn "Connecting %s to %s" endpoint target | |
connections <- Set.add (endpoint, target) connections | |
connections <- Set.add (target, endpoint) connections | |
) | |
member this.Register(endpoint, peer) = peers <- Map.add endpoint peer peers | |
member this.AreConnected(a, b) = | |
lock sync ( fun () -> Set.contains (a, b) connections || Set.contains (b, a) connections) | |
member this.Events = peerEvents.Reader | |
member this.Reactor (endpoint: Endpoint) = | |
{ new Reactor with | |
member this.Send(target, msg) = async { | |
match Map.tryFind target peers with | |
| Some (m: Membership) -> | |
if not (Set.contains (endpoint, target) connections) then | |
connect endpoint target | |
m.Post msg | |
//printfn "%s sent to %s: %O" endpoint target msg | |
| None -> failwithf "Couldn't find target node: %O" target | |
} | |
member this.Disconnect(target) = async { | |
lock sync (fun () -> | |
//printfn "Disconnecting %s from %s" endpoint target | |
connections <- Set.remove (endpoint, target) connections | |
connections <- Set.remove (target, endpoint) connections | |
) | |
} | |
member this.Notify event = | |
peerEvents.Writer.WriteAsync((endpoint, event)).GetAwaiter().GetResult() } | |
let private membership (env: TestEnv) (config: Config) (random: Random) = | |
let reactor = env.Reactor config.LocalEndpoint | |
let m = new Membership(reactor, config, random) | |
env.Register(config.LocalEndpoint, m) | |
m | |
let private poll count (env: TestEnv) (ct: CancellationToken) = task { | |
let messages = Array.zeroCreate count | |
for i=0 to count-1 do | |
let! msg = env.Events.ReadAsync ct | |
messages.[i] <- msg | |
return messages | |
} | |
[<RequireQualifiedAccess>] | |
module Expect = | |
let connected (env: TestEnv) connections = | |
for (x, y) in connections do | |
Expect.isTrue (env.AreConnected(x, y)) <| sprintf "%s-%s should be connected" x y | |
let disconnected (env: TestEnv) connections = | |
for (x, y) in connections do | |
Expect.isFalse (env.AreConnected(x, y)) <| sprintf "%s-%s should be diconnected" x y | |
[<Tests>] | |
let tests = testList "Hyparview" [ | |
testTask "join & leave" { | |
do! task { | |
let env = TestEnv() | |
use a = membership env (Config.Create "A") (Random()) | |
use b = membership env (Config.Create "B") (Random()) | |
use cancel = new CancellationTokenSource(1000) | |
do! a.Join "B" | |
let! actual = poll 2 env cancel.Token | |
let expected = Set.ofList [("A", NeighborUp "B"); ("B", NeighborUp "A")] | |
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by both joining nodes" | |
Expect.isTrue (env.AreConnected("A", "B")) "both parties should be connected" | |
do! b.Disconnect "A" | |
let! actual = poll 2 env cancel.Token | |
let expected = Set.ofList [("A", NeighborDown "B"); ("B", NeighborDown "A")] | |
Expect.equal (Set.ofArray actual) expected "neighbor down should be notified by both disconnecting nodes" | |
Expect.disconnected env [("A", "B")] | |
} | |
} | |
testTask "limit active view size" { | |
do! task { | |
let env = TestEnv() | |
use a = membership env ({ Config.Create "A" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use b = membership env ({ Config.Create "B" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use c = membership env ({ Config.Create "C" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use d = membership env ({ Config.Create "D" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234)) | |
use cancel = new CancellationTokenSource(1000) | |
do! a.Join "B" | |
do! a.Join "C" | |
do! b.Join "C" | |
// A-B-C are interconnected to each other (active view cap = 2) | |
let expected = Set.ofList [ | |
("A", NeighborUp "B") | |
("A", NeighborUp "C") | |
("B", NeighborUp "C") | |
("B", NeighborUp "A") | |
("C", NeighborUp "B") | |
("C", NeighborUp "A") | |
] | |
let! actual = poll 6 env cancel.Token | |
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by all joining nodes" | |
Expect.connected env [("A", "B"); ("A", "C") ; ("B", "C")] | |
do! d.Join "A" // since D joins A and has no other neighbors, it has higher priority. A must disconnect from either B or C. | |
let expected = Set.ofList [ | |
("D", NeighborUp "A") | |
("A", NeighborUp "D") | |
("A", NeighborDown "B") | |
("B", NeighborDown "A") | |
] | |
let! actual = poll 4 env cancel.Token | |
Expect.equal (Set.ofArray actual) expected "D should join with A, which should disconnect from B or C" | |
Expect.connected env [("A", "D"); ("A", "C") ; ("B", "C")] | |
Expect.disconnected env [("A", "B"); ("D", "B") ; ("D", "C")] | |
} | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment