Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active June 21, 2023 18:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/84fac596101b197da0546d1697580d99 to your computer and use it in GitHub Desktop.
Save Horusiath/84fac596101b197da0546d1697580d99 to your computer and use it in GitHub Desktop.
Plumtree + Hyparview implementation in F#
namespace Protocols
open System
open System.Runtime.ExceptionServices
type Endpoint = string
type TTL = int
type Binary = byte[]
type MessageId = Guid
type Round = uint64
type Config =
{ LocalEndpoint: Endpoint // identifier of current node
ActiveRandomWalkLength: int // init TTL for `Neighbor` message
PassiveRandomWalkLength: int // TTL threshold after which Neighbor may result in passive state addition
ActiveViewCapacity: int // max allowed size of active view
PassiveViewCapacity: int // max allowed size of passive view
ShuffleTTL: int // init TTL for `Shuffle` message
ShuffleActiveViewCount: int // no. of active peers included in `Shuffle` message
ShufflePassiveViewCount: int // no. of passive peers included in `Shuffle` message
ShuffleInterval: TimeSpan } // time interval in which shuffling procedure is started
static member Create(endpoint) = { Config.Default with LocalEndpoint = endpoint }
static member Default =
{ LocalEndpoint = "localhost:5000"
ActiveRandomWalkLength = 5
PassiveRandomWalkLength = 2
ActiveViewCapacity = 4
PassiveViewCapacity = 24
ShuffleTTL = 2
ShuffleActiveViewCount = 2
ShufflePassiveViewCount = 2
ShuffleInterval = TimeSpan.FromSeconds 60. }
type Shuffle =
{ Origin: Endpoint // the original initiator of Shuffle request
Nodes: Set<Endpoint> // shuffled nodes that being exchanged
Ttl: TTL } // remaining time to live
[<Struct>]
type Gossip =
{ Id: MessageId
Round: Round
Data: Binary }
type Message =
/// Initial request to join to given cluster.
| Join
/// Upon Join, ForwardJoin is gossiped over the cluster to introduce `peer` to other cluster members.
| ForwardJoin of peer:Endpoint * ttl:TTL
/// Message that triggers shuffle procedure.
| DoShuffle
/// Shuffle request, that is used to refresh and exchange passive view with other nodes.
| Shuffle of Shuffle
/// Shuffle reply, that contains exchanged nodes.
| ShuffleReply of Set<Endpoint>
/// Request to add sender to an active view of recipient. If `highPriority` is set, it cannot be denied.
| Neighbor of highPriority:bool
/// Disconnect request. If `alive` is set, sender can safely be added to passive set for future reconnections.
/// If `response` is set, recipient should answer with its own `Disconnect` (with respond=false) as well.
| Disconnect of alive:bool * respond:bool
/// Message with metadata to be broadcasted over the cluster. Metadata is necessary to identify which nodes
/// may have missed it and to optimize too long broadcast tree branches.
| Gossip of Gossip
/// Prune is used to sever tree link between two peers, moving them from eager to lazy sets.
| Prune
/// A batched set of the recently broadcasted messages. It's periodically sent to lazy peers, to identify if their
/// connections have not been severed. If that happens they'll try to reestablish connection.
| IHave of struct(MessageId*Round)[]
/// Send to trigger grafts to be send for missing messages.
| Timer of MessageId
/// Send when a missing broadcast has been detected.
| Graft of MessageId * Round
/// Created by the client, it's like `Gossip` but missing metadata that are internal knowledge of current peer.
| Broadcast of Binary
/// Send to self to trigger reaction to new active peer showing up.
| Up
/// Send to self to trigger reaction to active peer shutting down up.
| Down
/// Send to self periodically, to trigger emptying procedure on messages scheduled in lazy queue to be sent to corresponding lazy peers.
| Dispatch
member this.CanInit =
match this with
| Disconnect _ | DoShuffle -> false
| _ -> true
[<Struct>]
type Envelope = { Peer: Endpoint; Message: Message }
type PeerEvent =
| NeighborUp of Endpoint // Notifies that new peer has been added to active set.
| NeighborDown of Endpoint // Notifies that existing peer has stepped down from active set.
| Received of Binary
[<Interface>]
type Reactor =
/// Sends a `message` to a `target` node. If current and `target` nodes where not connected so far,
/// it will establish connection first.
abstract Send: target:Endpoint * envelope:Envelope -> Async<unit>
/// Disconnects from previously connected node (connection happens on initial `this.Send` request).
abstract Disconnect: Endpoint -> Async<unit>
/// Sends a peer event notification.
abstract Notify: PeerEvent -> unit
[<Struct>]
type Announcement = { Sender: Endpoint; MessageId: MessageId; Round: Round }
/// Reference: https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf
module Hyparview =
type State =
{ ActiveView: Set<Endpoint>
PassiveView: Set<Endpoint>
Config: Config
Random: Random
Output: Reactor }
static member Create (config: Config) (out: Reactor) (random: Random) =
{ ActiveView = Set.empty
PassiveView = Set.empty
Random = random
Config = config
Output = out }
member this.Self = this.Config.LocalEndpoint
member this.Send(target, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg })
type Random with
member this.Pick(set: Set<'a>): 'a option =
if Set.isEmpty set then None
else
let i = this.Next(set.Count)
set |> Seq.skip i |> Seq.tryHead
member this.Shuffle(set: Set<'a>): 'a seq =
set |> Seq.sortBy (fun _ -> this.Next())
let private isPassiveViewFull (state: State) =
state.PassiveView.Count >= state.Config.PassiveViewCapacity
let private addPassive (peer: Endpoint) (state: State) =
if Set.contains peer state.ActiveView || Set.contains peer state.PassiveView || peer = state.Self
then state
else
let passive =
if isPassiveViewFull state then
state.Random.Pick state.PassiveView
|> Option.map (fun drop -> Set.remove drop state.PassiveView)
|> Option.defaultValue state.PassiveView
else state.PassiveView
{ state with PassiveView = Set.add peer passive }
let private isActiveViewFull (state: State) =
state.ActiveView.Count >= state.Config.ActiveViewCapacity
/// Removes active peer, disconnecting necessary connection, sending communication messages and triggering peer events.
let private removeActive (peer: Endpoint) (state: State) respond = async {
let active = Set.remove peer state.ActiveView
if obj.ReferenceEquals(active, state.ActiveView) then return state
else
if respond then
do! state.Send(peer, Disconnect(alive=true, respond=false))
do! state.Output.Disconnect(peer)
state.Output.Notify(NeighborDown peer)
do! state.Output.Send(state.Self, { Peer = peer; Message = Down })
return addPassive peer { state with ActiveView = active }
}
/// Checks if active view is full and if so, removes one of the nodes at random.
let private removeActiveIfFull state = async {
if isActiveViewFull state then
match state.Random.Pick state.ActiveView with
| Some drop -> return! removeActive drop state true
| None -> return state
else return state
}
/// Forcefully puts peer into active view, sending necessary messages,
/// establishing connections and triggering peer events in the process.
let private addActive (peer: Endpoint) (highPriority: bool) (state: State) = async {
if Set.contains peer state.ActiveView || peer = state.Self then return state
else
let! state = removeActiveIfFull state
let passive = Set.remove peer state.PassiveView
let active = Set.add peer state.ActiveView
do! state.Send(peer, Neighbor(highPriority))
state.Output.Notify(NeighborUp peer)
do! state.Output.Send(state.Self, { Peer = peer; Message = Up })
return { state with ActiveView = active; PassiveView = passive }
}
let private onDisconnect (old: State) (peer: Endpoint) alive respond = async {
let! state = removeActive peer old respond
if not (obj.ReferenceEquals(old, state)) then
let passive = Set.remove peer state.PassiveView
if not (isActiveViewFull state) then
// if active view has free slot, pick one passive peer at random and promote it into active view
match state.Random.Pick passive with
| Some node ->
let highPriority = Set.isEmpty state.ActiveView
do! state.Send(node, Neighbor(highPriority))
return if alive then addPassive peer state else state
| None -> return if alive then addPassive peer state else state
else
return if alive then addPassive peer state else state
else return state
}
/// Gets samples of configured size from active and passive view and send them to random active peer.
let private doShuffle (state: State) = async {
match state.Random.Pick state.ActiveView with
| None -> return state
| Some node ->
let active =
state.Random.Shuffle (Set.remove node state.ActiveView)
|> Seq.take state.Config.ShuffleActiveViewCount
|> Set.ofSeq
let passive =
state.Random.Shuffle state.PassiveView
|> Seq.take state.Config.ShufflePassiveViewCount
|> Set.ofSeq
let msg =
{ Origin = state.Self
Nodes = active + passive
Ttl = state.Config.ShuffleTTL }
do! state.Send(node, Shuffle msg)
return state
}
let private onNeighbor (state: State) (peer: Endpoint) highPriority = async {
if highPriority || not (isActiveViewFull state) then
return! addActive peer highPriority state
else return state
}
let private onJoin (state: State) (peer: Endpoint) = async {
let! state = addActive peer true state
let ttl = state.Config.ActiveRandomWalkLength
let fwd = ForwardJoin(peer, ttl)
for node in Set.remove peer state.ActiveView do
do! state.Send(node, fwd) // announce new joining peer to other active peers
return state
}
let private onForwardJoin (state: State) (peer: Endpoint) (sender: Endpoint) ttl = async {
if ttl = 0 || Set.isEmpty state.ActiveView then return! addActive peer true state
else
let state = if ttl = state.Config.PassiveRandomWalkLength then addPassive peer state else state
match state.Random.Pick(Set.remove sender state.ActiveView) with
| None -> return! addActive peer true state
| Some next ->
do! state.Send(next, ForwardJoin(peer, ttl-1))
return state
}
let private onShuffle (state: State) (shuffle: Shuffle) sender = async {
if shuffle.Ttl = 0 then
let nodes =
state.Random.Shuffle(state.PassiveView)
|> Seq.take shuffle.Nodes.Count
|> Set.ofSeq
do! state.Send(shuffle.Origin, ShuffleReply(nodes))
return shuffle.Nodes |> Set.fold (fun acc node -> addPassive node acc) state
else match state.Random.Pick(state.ActiveView - Set.ofList [shuffle.Origin; sender]) with
| Some node ->
do! state.Send(node, Shuffle { shuffle with Ttl = shuffle.Ttl - 1 })
return state
| None -> return state
}
let private onShuffleReply (state: State) (nodes: Set<Endpoint>) = async {
return nodes |> Set.fold (fun acc peer -> addPassive peer acc) state
}
/// Disconnect peer if its not in active set.
let private disconnectNonActivePeer (state: State) (peer: Endpoint) = async {
if peer <> state.Self && not (Set.contains peer state.ActiveView) then
do! state.Send(peer, Disconnect(alive=true, respond=false))
do! state.Output.Disconnect(peer)
}
let handle (state: State) (e: Envelope) : Async<State> = async {
let! state =
match e.Message with
| Join -> onJoin state e.Peer
| ForwardJoin(peer, ttl) -> onForwardJoin state peer e.Peer ttl
| Shuffle shuffle -> onShuffle state shuffle e.Peer
| ShuffleReply nodes -> onShuffleReply state nodes
| Neighbor highPriority -> onNeighbor state e.Peer highPriority
| Disconnect(alive, respond) -> onDisconnect state e.Peer alive respond
| DoShuffle -> doShuffle state
| _ -> async.Return state
// make sure to dispose the connection from sender if it's not a part of active view
if e.Message.CanInit then
do! disconnectNonActivePeer state e.Peer
return state
}
/// Reference: https://core.ac.uk/download/pdf/32330596.pdf
/// NOTES: Plumtree proposes to send IHave and Graft messages after some timeout, but this is not implemented yet
module Plumtree =
type State =
{ Self: Endpoint // ID/address of current node
EagerPushPeers: Set<Endpoint> // direct branches of peers to broadcast messages to
LazyPushPeers: Set<Endpoint> // peers that periodically receive message IHave to detect undelivered messages
LazyQueue: struct (MessageId * Round * Endpoint) list // lazy peers don't need to be informed right away, IHave can worked in batched manner
Output: Reactor
Config: Config
ReceivedMessages: Map<MessageId, Gossip> // received messages are stashed to purpose of redelivery. It may need to be prunned from time to time.
Missing: Set<Announcement> // messages that are confirmed to be not received
Timers: Set<MessageId> } // active timers
static member Create (config: Config) (out: Reactor) =
{ Self = config.LocalEndpoint
EagerPushPeers = Set.empty
LazyPushPeers = Set.empty
LazyQueue = []
Output = out
Config = config
ReceivedMessages = Map.empty
Missing = Set.empty
Timers = Set.empty }
member this.Send(target: Endpoint, msg: Message) = this.Output.Send(target, { Peer = this.Self; Message = msg })
/// Moves peer into eager set.
let private addEager peer state =
{ state with
EagerPushPeers = Set.add peer state.EagerPushPeers
LazyPushPeers = Set.remove peer state.LazyPushPeers }
/// Moves peer into lazy set.
let private addLazy peer state =
{ state with
EagerPushPeers = Set.remove peer state.EagerPushPeers
LazyPushPeers = Set.add peer state.LazyPushPeers }
/// Immediatelly sends message to eager peers.
let private eagerPush (state: State) gossip sender = async {
do! state.EagerPushPeers - Set.ofArray [| state.Self; sender |]
|> Seq.map (fun peer ->
//printfn "%s eager push to %s" state.Self peer
state.Send(peer, Gossip gossip))
|> Async.Parallel
|> Async.Ignore
}
/// Puts lazy message announcements on top of the queue which will be consumed into batched IHave message
/// once dispatch trigger activates (it's cyclic operation).
let private lazyPush (state: State) (gossip: Gossip) sender = async {
let lazyQueue =
Set.remove sender state.LazyPushPeers
|> Set.fold (fun acc peer -> struct(gossip.Id, gossip.Round, peer)::acc) state.LazyQueue
return { state with LazyQueue = lazyQueue }
}
/// Dispatches messages from lazy queue over to lazy peers.
let private onDispatch (state: State) = async {
let gossips =
state.LazyQueue
|> List.fold (fun acc struct(id, round, peer) ->
let msg = struct(id, round)
match Map.tryFind peer acc with
| None -> Map.add peer [msg] acc
| Some existing -> Map.add peer (msg::existing) acc) Map.empty
do! gossips
|> Seq.map (fun (KeyValue(peer, grafts)) ->
//printfn "%s lazy push to %s" state.Self peer
state.Send(peer, IHave(List.toArray grafts)))
|> Async.Parallel
|> Async.Ignore
return { state with LazyQueue = [] }
}
/// Builds a gossip message and sends it. Lazy peers receive only gossip metadata that is used to
/// detect disconnections (in which case gossips where not received) and trigger self-heal process.
let private onBroadcast (state: State) (data: Binary) = async {
let msgId = Guid.NewGuid()
let gossip = { Id = msgId; Round = 0UL; Data = data }
do! eagerPush state gossip state.Self
let! state = lazyPush state gossip state.Self
return { state with ReceivedMessages = Map.add msgId gossip state.ReceivedMessages }
}
//let private optimize (state: State) data messageId round sender = async {
// for m in state.Missing |> Set.filter (fun m -> m.MessageId = messageId) do
// if m.Round < round && round - m.Round >= state.Config.OptimizationThreshold then
// do! state.Send(m.Sender, Graft(0UL, m.Round))
// do! state.Send(sender, Prune)
//}
let private onGossip (state: State) (gossip: Gossip) sender = async {
if Map.containsKey gossip.Id state.ReceivedMessages then
let state = addLazy sender state
do! state.Send(sender, Prune)
return state
else
let received = Map.add gossip.Id gossip state.ReceivedMessages
let message = { gossip with Round = gossip.Round+1UL }
do! eagerPush state message sender
let! state = lazyPush state message sender
let state = { addEager sender state with ReceivedMessages = received }
state.Output.Notify (Received gossip.Data)
//TODO: schedule timer that will prune gossip after some period (~2x of expected time to broadcast gossip to all nodes)
return state
}
/// Prunes the sender, moving it into lazy peers.
let private onPrune (state: State) sender = async {
return addLazy sender state
}
let private iHave (state: State) (messageId: MessageId) (round: Round) sender =
if Map.containsKey messageId state.ReceivedMessages then
let timers = Set.add messageId state.Timers
let missing = { MessageId = messageId; Sender = sender; Round = round }
{ state with Timers = timers; Missing = Set.add missing state.Missing }
else state
let private onIHave state notes sender = async {
let nstate = notes |> Array.fold (fun state struct(msgId, round) -> iHave state msgId round sender) state
for messageId in nstate.Timers - state.Timers do
//TODO: schedule timer to send this message after config.IHaveTimeout
do! state.Output.Send(state.Self, { Message = Timer messageId; Peer = state.Self })
return nstate
}
let private removeFirstAnnouncement (missing: Set<Announcement>) msgId =
let found = Seq.find (fun m -> m.MessageId = msgId) missing
(Set.remove found missing), found
let private onTimer (state: State) (messageId: MessageId) = async {
let! timers = async {
if Set.contains messageId state.Timers then return state.Timers
else
//TODO: schedule timer to send this message after config.GraftTimeout
do! state.Output.Send(state.Self, { Message = Timer messageId; Peer = state.Self })
return Set.add messageId state.Timers }
let missing, announcement = removeFirstAnnouncement state.Missing messageId
let state = { addEager announcement.Sender { state with Missing = missing } with Timers = timers }
do! state.Send(announcement.Sender, Graft(messageId, announcement.Round))
return state
}
/// Handle Graft message. Receiving it usually means, that sender connection has been severed,
/// needs to be restored and following gossip message needs to be resend.
let private onGraft (state: State) (messageId: MessageId) (round: Round) sender = async {
let state = addEager sender state
match Map.tryFind messageId state.ReceivedMessages with
| None -> return state
| Some gossip ->
do! state.Send(sender, Gossip gossip)
return state
}
let private onNeighborDown (state: State) peer = async {
return { state with
Missing = state.Missing |> Set.filter (fun m -> m.Sender = peer)
EagerPushPeers = Set.remove peer state.EagerPushPeers
LazyPushPeers = Set.remove peer state.LazyPushPeers }
}
let private onNeighborUp (state: State) peer = async {
return { state with EagerPushPeers = Set.add peer state.EagerPushPeers }
}
let handle (state: State) (e: Envelope) = async {
match e.Message with
| Up -> return! onNeighborUp state e.Peer
| Down -> return! onNeighborDown state e.Peer
| Gossip gossip -> return! onGossip state gossip e.Peer
| Prune -> return! onPrune state e.Peer
| IHave notes -> return! onIHave state notes e.Peer
| Timer msgId -> return! onTimer state msgId
| Graft(msgId, round) -> return! onGraft state msgId round e.Peer
| Broadcast data -> return! onBroadcast state data
| Dispatch -> return! onDispatch state
| _ -> return state
}
[<Sealed>]
type Peer(out: Reactor, ?config: Config, ?random: Random) =
let config = config |> Option.defaultValue Config.Default
let random = random |> Option.defaultWith Random
let membership = Hyparview.State.Create config out random
let broadcast = Plumtree.State.Create config out
let actor = MailboxProcessor.Start(fun ctx ->
let rec loop (membership: Hyparview.State) (broadcast: Plumtree.State) = async {
let! (envelope, reply: AsyncReplyChannel<_> option) = ctx.Receive()
try
let! broadcast = Plumtree.handle broadcast envelope
let! membership = Hyparview.handle membership envelope
reply |> Option.iter (fun r -> r.Reply None)
return! loop membership broadcast
with err ->
reply |> Option.iter (fun r -> r.Reply (Some err))
return! loop membership broadcast
}
loop membership broadcast)
let send envelope = async {
match! actor.PostAndAsyncReply(fun r -> (envelope, Some r)) with
| None -> ()
| Some e -> ExceptionDispatchInfo.Capture(e).Throw()
}
member this.Endpoint = config.LocalEndpoint
member this.Post message = actor.Post (message, None)
member this.Join endpoint = send { Message = Join; Peer = endpoint }
member this.Disconnect endpoint = send { Message = Disconnect(true, true); Peer = endpoint }
member this.Broadcast data = send { Message = Broadcast data; Peer = config.LocalEndpoint }
member this.Dispose() =
(actor :> IDisposable).Dispose()
interface IDisposable with member this.Dispose() = this.Dispose()
module Tests
open System
open System.Threading
open System.Threading.Channels
open System.Threading.Tasks
open Protocols
open FSharp.Control.Tasks
open Expecto
[<Sealed>]
type TestEnv() =
let sync = obj()
let mutable peers = Map.empty
let mutable connections = Set.empty
let peerEvents = Channel.CreateUnbounded<_>()
let connect endpoint target =
lock sync (fun () ->
//printfn "Connecting %s to %s" endpoint target
connections <- Set.add (endpoint, target) connections
connections <- Set.add (target, endpoint) connections
)
member this.Register(endpoint, peer) = peers <- Map.add endpoint peer peers
member this.AreConnected(a, b) =
lock sync ( fun () -> Set.contains (a, b) connections || Set.contains (b, a) connections)
member this.Events = peerEvents.Reader
member this.Reactor (endpoint: Endpoint) =
{ new Reactor with
member this.Send(target, msg) = async {
match Map.tryFind target peers with
| Some (m: Peer) ->
if not (Set.contains (endpoint, target) connections) then
connect endpoint target
m.Post msg
| None -> failwithf "Couldn't find target node: %O" target
}
member this.Disconnect(target) = async {
lock sync (fun () ->
//printfn "Disconnecting %s from %s" endpoint target
connections <- Set.remove (endpoint, target) connections
connections <- Set.remove (target, endpoint) connections
)
}
member this.Notify event =
//lock sync (fun () -> printfn "%s received: %O" endpoint event)
peerEvents.Writer.WriteAsync((endpoint, event)).GetAwaiter().GetResult() }
let peer (env: TestEnv) (config: Config) (random: Random) =
let reactor = env.Reactor config.LocalEndpoint
let m = new Peer(reactor, config, random)
env.Register(config.LocalEndpoint, m)
m
let poll count (env: TestEnv) (ct: CancellationToken) = task {
let messages = Array.zeroCreate count
for i=0 to count-1 do
let! msg = env.Events.ReadAsync ct
messages.[i] <- msg
return messages
}
[<RequireQualifiedAccess>]
module Expect =
let connected (env: TestEnv) connections =
for (x, y) in connections do
Expect.isTrue (env.AreConnected(x, y)) <| sprintf "%s-%s should be connected" x y
let disconnected (env: TestEnv) connections =
for (x, y) in connections do
Expect.isFalse (env.AreConnected(x, y)) <| sprintf "%s-%s should be diconnected" x y
[<Tests>]
let hyparviewTests = testList "Hyparview" [
testTask "join & leave" {
do! task {
let env = TestEnv()
use a = peer env (Config.Create "A") (Random())
use b = peer env (Config.Create "B") (Random())
use cancel = new CancellationTokenSource(1000)
do! a.Join "B"
let! actual = poll 2 env cancel.Token
let expected = Set.ofList [("A", NeighborUp "B"); ("B", NeighborUp "A")]
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by both joining nodes"
Expect.isTrue (env.AreConnected("A", "B")) "both parties should be connected"
do! b.Disconnect "A"
let! actual = poll 2 env cancel.Token
let expected = Set.ofList [("A", NeighborDown "B"); ("B", NeighborDown "A")]
Expect.equal (Set.ofArray actual) expected "neighbor down should be notified by both disconnecting nodes"
Expect.disconnected env [("A", "B")]
}
}
testTask "limit active view size" {
do! task {
let env = TestEnv()
use a = peer env ({ Config.Create "A" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use b = peer env ({ Config.Create "B" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use c = peer env ({ Config.Create "C" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use d = peer env ({ Config.Create "D" with ActiveViewCapacity = 2; PassiveViewCapacity = 3 }) (Random(1234))
use cancel = new CancellationTokenSource(1000)
do! a.Join "B"
do! a.Join "C"
do! b.Join "C"
// A-B-C are interconnected to each other (active view cap = 2)
let expected = Set.ofList [
("A", NeighborUp "B")
("A", NeighborUp "C")
("B", NeighborUp "C")
("B", NeighborUp "A")
("C", NeighborUp "B")
("C", NeighborUp "A")
]
let! actual = poll 6 env cancel.Token
Expect.equal (Set.ofArray actual) expected "neighbor up should be notified by all joining nodes"
Expect.connected env [("A", "B"); ("A", "C") ; ("B", "C")]
do! d.Join "A" // since D joins A and has no other neighbors, it has higher priority. A must disconnect from either B or C.
let expected = Set.ofList [
("D", NeighborUp "A")
("A", NeighborUp "D")
("A", NeighborDown "B")
("B", NeighborDown "A")
]
let! actual = poll 4 env cancel.Token
Expect.equal (Set.ofArray actual) expected "D should join with A, which should disconnect from B or C"
Expect.connected env [("A", "D"); ("A", "C") ; ("B", "C")]
Expect.disconnected env [("A", "B"); ("D", "B") ; ("D", "C")]
}
}
]
let private encode (x: string) : Binary = System.Text.Encoding.UTF8.GetBytes x
let private decode (x: Binary) : string = System.Text.Encoding.UTF8.GetString x
let private addr i = "P" + string i
let private receiveAllOf (env: TestEnv) (expected: Set<_>) (cancel) = task {
let mutable remaining = expected
try
while not (Set.isEmpty remaining) do
let! received = env.Events.ReadAsync(cancel)
remaining <- Set.remove received remaining
with :? OperationCanceledException ->
failwithf "didn't received %i messages: %A" (Set.count remaining) remaining
}
let private setup env (n: int) = task {
let makePeer i =
let config = { Config.Create (addr i) with ActiveViewCapacity = int (Math.Log2(float n)) + 1; PassiveViewCapacity = 6* (int (Math.Log2(float n)) + 1) }
peer env config (Random(1234))
let peers = Array.init n makePeer
for i=0 to n-1 do
do! peers.[i].Join (addr ((i+1) % n))
do! Task.Delay 1000 // wait for peers to stabilize connection
let! _ = poll n env CancellationToken.None
return peers
}
[<Tests>]
let plumtreeTests = testList "Plumtree" [
testTask "test broadcast message in big cluster" {
do! task {
let n = 1000
let env = TestEnv()
let! peers = setup env n // simulate broadcasting a message in the cluster of thousand nodes
let msg = encode "hello"
do! (Array.head peers).Broadcast msg
let expected = Array.init (n-1) (fun i -> (addr (i+1), Received msg)) |> Set.ofArray
use cancel = new CancellationTokenSource 10_000
do! receiveAllOf env expected cancel.Token
}
}
testTask "broadcast works even when nodes have disconnected" {
do! task {
let n = 100
let env = TestEnv()
let! peers = setup env n // simulate broadcasting a message in the cluster of thousand nodes
let msg = encode "hello"
do! (Array.head peers).Broadcast msg
// establish initial broadcast tree
let expected = Array.init (n-1) (fun i -> (addr (i+1), Received msg)) |> Set.ofArray
use cancel = new CancellationTokenSource 10_000
do! receiveAllOf env expected cancel.Token
let dc = peers.[n/2]
do! dc.Disconnect "P15"
do! dc.Disconnect "P21"
do! dc.Disconnect "P47"
do! dc.Disconnect "P53"
do! dc.Disconnect "P7"
do! dc.Disconnect "P71"
do! dc.Broadcast msg
let expected =
Array.init n (fun i -> (addr i, Received msg))
|> Set.ofArray
|> Set.filter (fun (addr, _) -> addr <> dc.Endpoint)
use cancel = new CancellationTokenSource 10_000
do! receiveAllOf env expected cancel.Token
}
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment