Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active August 18, 2021 10:29
Show Gist options
  • Save Horusiath/fe6b124c0bf38291bc5c87a754db0681 to your computer and use it in GitHub Desktop.
Save Horusiath/fe6b124c0bf38291bc5c87a754db0681 to your computer and use it in GitHub Desktop.
HyParView: a membership protocol for reliable gossip-based broadcast
// Reference: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
module Protocols.Hyparview
open System
open System.Runtime.ExceptionServices
open System.Threading
type Endpoint = string
type TTL = int
type Config =
{ LocalEndpoint: Endpoint // identifier of current node
ActiveRandomWalkLength: int // init TTL for `Neighbor` message
PassiveRandomWalkLength: int // TTL threshold after which Neighbor may result in passive state addition
ActiveViewCapacity: int // max allowed size of active view
PassiveViewCapacity: int // max allowed size of passive view
ShuffleTTL: int // init TTL for `Shuffle` message
ShuffleActiveViewCount: int // no. of active peers included in `Shuffle` message
ShufflePassiveViewCount: int // no. of passive peers included in `Shuffle` message
ShuffleInterval: TimeSpan } // time interval in which shuffling procedure is started
static member Create(endpoint) = { Config.Default with LocalEndpoint = endpoint }
static member Default =
{ LocalEndpoint = "localhost:5000"
ActiveRandomWalkLength = 5
PassiveRandomWalkLength = 2
ActiveViewCapacity = 4
PassiveViewCapacity = 24
ShuffleTTL = 2
ShuffleActiveViewCount = 2
ShufflePassiveViewCount = 2
ShuffleInterval = TimeSpan.FromSeconds 60. }
type Shuffle =
{ Origin: Endpoint // the original initiator of Shuffle request
Nodes: Set<Endpoint> // shuffled nodes that being exchanged
Ttl: TTL } // remaining time to live
type Message =
/// Initial request to join to given cluster.
| Join
/// Upon Join, ForwardJoin is gossiped over the cluster to introduce `peer` to other cluster members.
| ForwardJoin of peer:Endpoint * ttl:TTL
/// Message that triggers shuffle procedure.
| DoShuffle
/// Shuffle request, that is used to refresh and exchange passive view with other nodes.
| Shuffle of Shuffle
/// Shuffle reply, that contains exchanged nodes.
| ShuffleReply of Set<Endpoint>
/// Request to add sender to an active view of recipient. If `highPriority` is set, it cannot be denied.
| Neighbor of highPriority:bool
/// Disconnect request. If `alive` is set, sender can safely be added to passive set for future reconnections.
/// If `response` is set, recipient should answer with its own `Disconnect` (with respond=false) as well.
| Disconnect of alive:bool * respond:bool
/// Determines if this message type can be used to initialize connection. Such messages needs to be checked
/// if sender is in active set, and disconnect from it otherwise.
member this.CanInit =
match this with
| Disconnect _ | DoShuffle -> false
| _ -> true
[<Struct>]
type Envelope = { Peer: Endpoint; Message: Message }
type PeerEvent =
| NeighborUp of Endpoint // Notifies that new peer has been added to active set.
| NeighborDown of Endpoint // Notifies that existing peer has stepped down from active set.
[<Interface>]
type Reactor =
/// Sends a `message` to a `target` node. If current and `target` nodes where not connected so far,
/// it will establish connection first.
abstract Send: target:Endpoint * envelope:Envelope -> Async<unit>
/// Disconnects from previously connected node (connection happens on initial `this.Send` request).
abstract Disconnect: Endpoint -> Async<unit>
/// Sends a peer event notification.
abstract Notify: PeerEvent -> unit
type PeerState =
{ ActiveView: Set<Endpoint>
PassiveView: Set<Endpoint>
Config: Config
Random: Random
Output: Reactor }
static member Create (config: Config) (out: Reactor) (random: Random) =
{ ActiveView = Set.empty
PassiveView = Set.empty
Random = random
Config = config
Output = out }
member this.Self = this.Config.LocalEndpoint
member this.Send(target, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg })
type Random with
member this.Pick(set: Set<'a>): 'a option =
if Set.isEmpty set then None
else
let i = this.Next(set.Count)
set |> Seq.skip i |> Seq.tryHead
member this.Shuffle(set: Set<'a>): 'a seq =
set |> Seq.sortBy (fun _ -> this.Next())
let private isPassiveViewFull (state: PeerState) =
state.PassiveView.Count >= state.Config.PassiveViewCapacity
let private addPassive (peer: Endpoint) (state: PeerState) =
if Set.contains peer state.ActiveView || Set.contains peer state.PassiveView || peer = state.Self
then state
else
let passive =
if isPassiveViewFull state then
state.Random.Pick state.PassiveView
|> Option.map (fun drop -> Set.remove drop state.PassiveView)
|> Option.defaultValue state.PassiveView
else state.PassiveView
{ state with PassiveView = Set.add peer passive }
let private isActiveViewFull (state: PeerState) =
state.ActiveView.Count >= state.Config.ActiveViewCapacity
let private removeActive (peer: Endpoint) (state: PeerState) respond = async {
let active = Set.remove peer state.ActiveView
if obj.ReferenceEquals(active, state.ActiveView) then return state
else
if respond then
do! state.Send(peer, Disconnect(alive=true, respond=false))
do! state.Output.Disconnect(peer)
state.Output.Notify(NeighborDown peer)
return addPassive peer { state with ActiveView = active }
}
let private removeActiveIfFull state = async {
if isActiveViewFull state then
match state.Random.Pick state.ActiveView with
| Some drop -> return! removeActive drop state true
| None -> return state
else return state
}
let private addActive (peer: Endpoint) (highPriority: bool) (state: PeerState) = async {
if Set.contains peer state.ActiveView || peer = state.Self then return state
else
let! state = removeActiveIfFull state
let passive = Set.remove peer state.PassiveView
let active = Set.add peer state.ActiveView
do! state.Send(peer, Neighbor(highPriority))
state.Output.Notify(NeighborUp peer)
return { state with ActiveView = active; PassiveView = passive }
}
let private onDisconnect (old: PeerState) (peer: Endpoint) alive respond = async {
let! state = removeActive peer old respond
if not (obj.ReferenceEquals(old, state)) then
let passive = Set.remove peer state.PassiveView
if not (isActiveViewFull state) then
match state.Random.Pick passive with
| Some node ->
let highPriority = Set.isEmpty state.ActiveView
do! state.Send(node, Neighbor(highPriority))
return if alive then addPassive peer state else state
| None -> return if alive then addPassive peer state else state
else
return if alive then addPassive peer state else state
else return state
}
let private doShuffle (state: PeerState) = async {
match state.Random.Pick state.ActiveView with
| None -> return state
| Some node ->
let active =
state.Random.Shuffle (Set.remove node state.ActiveView)
|> Seq.take state.Config.ShuffleActiveViewCount
|> Set.ofSeq
let passive =
state.Random.Shuffle state.PassiveView
|> Seq.take state.Config.ShufflePassiveViewCount
|> Set.ofSeq
let msg =
{ Origin = state.Self
Nodes = active + passive
Ttl = state.Config.ShuffleTTL }
do! state.Send(node, Shuffle msg)
return state
}
let private onNeighbor (state: PeerState) (peer: Endpoint) highPriority = async {
if highPriority || not (isActiveViewFull state) then
return! addActive peer highPriority state
else return state
}
let private onJoin (state: PeerState) (peer: Endpoint) = async {
let! state = addActive peer true state
let ttl = state.Config.ActiveRandomWalkLength
let fwd = ForwardJoin(peer, ttl)
for node in Set.remove peer state.ActiveView do
do! state.Send(node, fwd)
return state
}
let private onForwardJoin (state: PeerState) (peer: Endpoint) (sender: Endpoint) ttl = async {
if ttl = 0 || Set.isEmpty state.ActiveView then return! addActive peer true state
else
let state = if ttl = state.Config.PassiveRandomWalkLength then addPassive peer state else state
match state.Random.Pick(Set.remove sender state.ActiveView) with
| None -> return! addActive peer true state
| Some next ->
do! state.Send(next, ForwardJoin(peer, ttl-1))
return state
}
let private onShuffle (state: PeerState) (shuffle: Shuffle) sender = async {
if shuffle.Ttl = 0 then
let nodes =
state.Random.Shuffle(state.PassiveView)
|> Seq.take shuffle.Nodes.Count
|> Set.ofSeq
do! state.Send(shuffle.Origin, ShuffleReply(nodes))
return shuffle.Nodes |> Set.fold (fun acc node -> addPassive node acc) state
else match state.Random.Pick(state.ActiveView - Set.ofList [shuffle.Origin; sender]) with
| Some node ->
do! state.Send(node, Shuffle { shuffle with Ttl = shuffle.Ttl - 1 })
return state
| None -> return state
}
let private onShuffleReply (state: PeerState) (nodes: Set<Endpoint>) = async {
return nodes |> Set.fold (fun acc peer -> addPassive peer acc) state
}
/// Disconnect peer if its not in active set.
let private disconnectNonActivePeer (state: PeerState) (peer: Endpoint) = async {
if peer <> state.Self && not (Set.contains peer state.ActiveView) then
do! state.Send(peer, Disconnect(alive=true, respond=false))
do! state.Output.Disconnect(peer)
}
let handle (state: PeerState) (e: Envelope) : Async<PeerState> = async {
let! state =
match e.Message with
| Join -> onJoin state e.Peer
| ForwardJoin(peer, ttl) -> onForwardJoin state peer e.Peer ttl
| Shuffle shuffle -> onShuffle state shuffle e.Peer
| ShuffleReply nodes -> onShuffleReply state nodes
| Neighbor highPriority -> onNeighbor state e.Peer highPriority
| Disconnect(alive, respond) -> onDisconnect state e.Peer alive respond
| DoShuffle -> doShuffle state
// make sure to dispose the connection from sender if it's not a part of active view
if e.Message.CanInit then
do! disconnectNonActivePeer state e.Peer
return state
}
let private actor (state: PeerState) =
let mailbox = MailboxProcessor.Start(fun inbox ->
let rec loop state = async {
let! (msg, ch: AsyncReplyChannel<exn option> option) = inbox.Receive()
try
let! state = handle state msg
ch |> Option.iter (fun c -> c.Reply None)
return! loop state
with
| e ->
ch |> Option.iter (fun c -> c.Reply (Some e))
return! loop state
}
loop state)
mailbox
type Membership(out: Reactor, ?config: Config, ?random: Random) =
let config = config |> Option.defaultValue Config.Default
let random = random |> Option.defaultWith Random
let state = PeerState.Create config out random
let mailbox = actor state
let shuffle =
let interval = int config.ShuffleInterval.TotalMilliseconds
new Timer(TimerCallback(fun _ ->
mailbox.Post ({ Peer = config.LocalEndpoint; Message = DoShuffle }, None)), null, interval, interval)
member this.Join(endpoint: Endpoint) = async {
match! mailbox.PostAndAsyncReply(fun ch -> ({ Peer = endpoint; Message = Join }, Some ch)) with
| None -> ()
| Some e -> ExceptionDispatchInfo.Capture(e).Throw()
}
member this.Disconnect(endpoint: Endpoint) = async {
match! mailbox.PostAndAsyncReply(fun ch -> ({ Peer = endpoint; Message = Disconnect(true, true) }, Some ch)) with
| None -> ()
| Some e -> ExceptionDispatchInfo.Capture(e).Throw()
}
member this.Endpoint = config.LocalEndpoint
member this.Post (e: Envelope) = mailbox.Post ((e, None))
interface IDisposable with
member this.Dispose() =
shuffle.Dispose()
(mailbox :> IDisposable).Dispose()
module HyparviewTests.HyparviewTests
open System
open Expecto
open Protocols.Hyparview
open System.Threading
open System.Threading.Channels
open FSharp.Control.Tasks
[<Sealed>]
type TestEnv() =
let sync = obj()
let mutable peers = Map.empty
let mutable connections = Set.empty
let peerEvents = Channel.CreateUnbounded<_>()
let connect endpoint target =
lock sync (fun () ->
//printfn "Connecting %s to %s" endpoint target
connections <- Set.add (endpoint, target) connections
connections <- Set.add (target, endpoint) connections
)
member this.Register(endpoint, peer) = peers <- Map.add endpoint peer peers
member this.AreConnected(a, b) =
lock sync ( fun () -> Set.contains (a, b) connections || Set.contains (b, a) connections)
member this.Events = peerEvents.Reader
member this.Reactor (endpoint: Endpoint) =
{ new Reactor with
member this.Send(target, msg) = async {
match Map.tryFind target peers with
| Some (m: Membership) ->
if not (Set.contains (endpoint, target) connections) then
connect endpoint target
m.Post msg
//printfn "%s sent to %s: %O" endpoint target msg
| None -> failwithf "Couldn't find target node: %O" target
}
member this.Disconnect(target) = async {
lock sync (fun () ->
//printfn "Disconnecting %s from %s" endpoint target
connections <- Set.remove (endpoint, target) connections
connections <- Set.remove (target, endpoint) connections
)
}
member this.Notify event =
peerEvents.Writer.WriteAsync((endpoint, event)).GetAwaiter().GetResult() }
let private membership (env: TestEnv) (config: Config) (random: Random) =
let reactor = env.Reactor config.LocalEndpoint
let m = new Membership(reactor, config, random)
env.Register(config.LocalEndpoint, m)
m
let private poll count (env: TestEnv) (ct: CancellationToken) = task {
let messages = Array.zeroCreate count
for i=0 to count-1 do
let! msg = env.Events.ReadAsync ct
messages.[i] <- msg
return messages
}
[<RequireQualifiedAccess>]
module Expect =
let connected (env: TestEnv) connections =
for (x, y) in connections do
Expect.isTrue (env.AreConnected(x, y)) <| sprintf "%s-%s should be connected" x y
let disconnected (env: TestEnv) connections =
for (x, y) in connections do
Expect.isFalse (env.AreConnected(x, y)) <| sprintf "%s-%s should be diconnected" x y
[<Tests>]
let tests = testList "Hyparview" [
testTask "join & leave" {
do! task {
let env = TestEnv()
use a = membership env (Config.Create "A") (Random())
use b = membership env (Config.Create "B") (Random())
use cancel = new CancellationTokenSource(1000)
do! a.Join "B"
let! actual = poll 2 env cancel.Token
let expected = Set.ofList [("A", NeighborUp "B"); ("B", NeighborUp "A")]
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by both joining nodes"
Expect.isTrue (env.AreConnected("A", "B")) "both parties should be connected"
do! b.Disconnect "A"
let! actual = poll 2 env cancel.Token
let expected = Set.ofList [("A", NeighborDown "B"); ("B", NeighborDown "A")]
Expect.equal (Set.ofArray actual) expected "neighbor down should be notified by both disconnecting nodes"
Expect.disconnected env [("A", "B")]
}
}
testTask "limit active view size" {
do! task {
let env = TestEnv()
use a = membership env ({ Config.Create "A" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use b = membership env ({ Config.Create "B" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use c = membership env ({ Config.Create "C" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use d = membership env ({ Config.Create "D" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use cancel = new CancellationTokenSource(1000)
do! a.Join "B"
do! a.Join "C"
do! b.Join "C"
// A-B-C are interconnected to each other (active view cap = 2)
let expected = Set.ofList [
("A", NeighborUp "B")
("A", NeighborUp "C")
("B", NeighborUp "C")
("B", NeighborUp "A")
("C", NeighborUp "B")
("C", NeighborUp "A")
]
let! actual = poll 6 env cancel.Token
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by all joining nodes"
Expect.connected env [("A", "B"); ("A", "C") ; ("B", "C")]
do! d.Join "A" // since D joins A and has no other neighbors, it has higher priority. A must disconnect from either B or C.
let expected = Set.ofList [
("D", NeighborUp "A")
("A", NeighborUp "D")
("A", NeighborDown "B")
("B", NeighborDown "A")
]
let! actual = poll 4 env cancel.Token
Expect.equal (Set.ofArray actual) expected "D should join with A, which should disconnect from B or C"
Expect.connected env [("A", "D"); ("A", "C") ; ("B", "C")]
Expect.disconnected env [("A", "B"); ("D", "B") ; ("D", "C")]
}
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment