Skip to content

Instantly share code, notes, and snippets.

@upperwal
Last active May 2, 2023 08:50
Show Gist options
  • Save upperwal/dd7c5924f590aa5e635aa8b39768e48c to your computer and use it in GitHub Desktop.
Save upperwal/dd7c5924f590aa5e635aa8b39768e48c to your computer and use it in GitHub Desktop.
Binary tree based Broadcast
package main
/* Binary tree based Broadcast
* This program demonstrates a tree based broadcast to all the peers in a network.
* Program starts by arranging a set of peers in a tree structure were a node is
* connected to exactly two nodes, it's children. This gives an almost complete binary tree.
*
* One-way flow of information:
* Data can only flow from top to bottom in the graph. So, if the root node initiates
* a broadcast. It will send the message to it's children who will then send it to their
* children. This way a message can reach all the nodes with O(log n) steps in the network.
*
* What if the message is originated from a non-root node?
* As the flow of messages is from top to bottom, root node might never get a message if the
* broadcast is originated from one of the non-root nodes. It is handled by connecting all the nodes
* in the last level of the tree with in a binary tree manner and the last node in the tree with the
* root node such that it now becomes a cyclic graph.
*
* Connecting last level of the tree (actually a graph) in it's own binary tree speed up's the system.
* As last level of a tree contains O(n/2) nodes if all the leaf nodes are connected in a linear manner
* it will take O(n/2) steps to get to the last node which will then send the message to the root
* node and start the binary broadcast. What if nodes in the last level are connected in a binary tree
* of it's own. Now a message from the left most nodes in the last level can reach the rightmost
* node in the last level in O(log n/2) steps. This will speed-up the broadcast if the message is
* originated from a leaf node.
*/
import (
"crypto/sha256"
"encoding/binary"
"fmt"
"io"
"math"
"net"
"strconv"
"sync"
"time"
"github.com/gogo/protobuf/proto"
"github.com/upperwal/go-bcast/protocol"
)
const (
portStart = 3000
)
// ProtoWConn encapsulates incoming connection with the packet.
type ProtoWConn struct {
Conn *net.Conn
Data *protocol.Protocol
}
// Node represents a host.
type Node struct {
rank int
universeSize int
connToPeer [2]*net.Conn
neighbours [2]int
outgoing chan *protocol.Protocol
incoming chan *ProtoWConn
outgoingControlRes chan *ProtoWConn
dupMessage map[[32]byte]bool // Could have a time bound.
mux *sync.Mutex
}
// NewNode creates a new host.
func NewNode(r, u int) *Node {
n := &Node{
rank: r,
universeSize: u,
outgoing: make(chan *protocol.Protocol, 10),
incoming: make(chan *ProtoWConn, 10),
outgoingControlRes: make(chan *ProtoWConn, 10),
dupMessage: make(map[[32]byte]bool),
mux: &sync.Mutex{},
}
n.setupSocket()
go n.messageLoop()
return n
}
// Start connects to the peers.
func (n *Node) Start() {
n.connectToPeer()
}
func (n *Node) setupSocket() {
l, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(n.rank+portStart))
handleErr(err)
go n.listen(l)
}
func (n *Node) connectToPeer() {
var vitrualStartNumber, peerOne, peerTwo int
// Which node to connect to?
// Each node will be connected to exactly two nodes. Non-leaf nodes will be connected to it's children.
// If any non-leaf node only have 1 child (incase of almost complete binary tree) the second connection
// is established with some node in the middle. This will improve the overall runtime in some cases.
//
// All leaf-nodes are connected in a binary tree which is independent of the binary tree of non-leaf
// nodes. Leaf nodes binary tree will speedup the time it takes to propagate a message from left-most
// to right-most node in the last level.
completeTree := int(math.Pow(2, (math.Ceil(math.Log2(float64(n.universeSize)))-1)) - 1)
if n.rank >= completeTree {
vitrualStartNumber = completeTree
} else {
vitrualStartNumber = 0
}
virtualNodeNumber := n.rank - vitrualStartNumber
if n.rank == n.universeSize-1 {
peerOne = 0
peerTwo = vitrualStartNumber
} else {
peerOne = (2*virtualNodeNumber + 1 + vitrualStartNumber) % n.universeSize
peerTwo = (peerOne + 1) % n.universeSize
}
n.neighbours[0] = peerOne
n.neighbours[1] = peerTwo
fmt.Println("Node ", n.rank, " >> ", peerOne, ", ", peerTwo)
connPeerOne, err := net.Dial("tcp", "127.0.0.1:"+strconv.Itoa(portStart+peerOne))
handleErr(err)
n.connToPeer[0] = &connPeerOne
go n.handler(&connPeerOne)
connPeerTwo, err := net.Dial("tcp", "127.0.0.1:"+strconv.Itoa(portStart+peerTwo))
handleErr(err)
n.connToPeer[1] = &connPeerTwo
go n.handler(&connPeerTwo)
}
func (n *Node) messageLoop() {
for {
select {
case o := <-n.outgoing:
//fmt.Println("Outgoing ", o.Type)
connPeerOne := *n.connToPeer[0]
connPeerTwo := *n.connToPeer[1]
data, err := proto.Marshal(o)
handleErr(err)
n.write(connPeerOne, data)
n.write(connPeerTwo, data)
case i := <-n.incoming:
//fmt.Println("Incoming ", i.Data.Type)
if i.Data.Type == protocol.Protocol_DATA_PACKET {
go n.processDataPacket(i)
} else if i.Data.Type == protocol.Protocol_CONTROL_REQ {
go n.processControlPacket(i)
} else if i.Data.Type == protocol.Protocol_CONTROL_RES {
go n.processControlRes(i)
}
case o := <-n.outgoingControlRes:
data, err := proto.Marshal(o.Data)
handleErr(err)
conn := *o.Conn
n.write(conn, data)
}
}
}
func (n *Node) write(conn net.Conn, data []byte) {
bs := make([]byte, 8)
binary.LittleEndian.PutUint64(bs, uint64(len(data)))
conn.Write(bs)
conn.Write(data)
}
func (n *Node) processDataPacket(i *ProtoWConn) {
switch i.Data.DataPacket.Type {
case protocol.Protocol_DataPacket_BROADCAST:
fmt.Println("Incoming Broadcast: [This Peer: "+strconv.Itoa(n.rank)+"] Data: "+i.Data.DataPacket.Data, " | Forwarding to: ", n.neighbours[0], " / ", n.neighbours[1])
// Read the data and send the packet as it is to your children.
n.outgoing <- i.Data
}
}
func (n *Node) processControlPacket(i *ProtoWConn) {
switch i.Data.ControlReq.Type {
case protocol.Protocol_ControlReq_GET_NEIGHBOURS:
fmt.Println("Received GET_NEIGHBOUR request")
res := &protocol.Protocol{
Type: protocol.Protocol_CONTROL_RES,
ControlRes: &protocol.Protocol_ControlRes{
From: int64(n.rank),
Data: strconv.Itoa(n.neighbours[0]) + ", " + strconv.Itoa(n.neighbours[1]),
},
}
n.outgoingControlRes <- &ProtoWConn{
Conn: i.Conn,
Data: res,
}
}
}
func (n *Node) processControlRes(i *ProtoWConn) {
fmt.Println("GET_NEIGHBOUR Response Neighbours: [Peer ", i.Data.ControlRes.From, "]: ", i.Data.ControlRes.Data)
}
func (n *Node) listen(l net.Listener) {
defer l.Close()
for {
conn, err := l.Accept()
handleErr(err)
go n.handler(&conn)
}
}
func (n *Node) handler(conn *net.Conn) {
defer (*conn).Close()
p := &protocol.Protocol{}
for {
bs := make([]byte, 8)
io.ReadFull(*conn, bs)
s := int(binary.LittleEndian.Uint64(bs))
buf := make([]byte, s)
io.ReadFull(*conn, buf)
err := proto.Unmarshal(buf, p)
handleErr(err)
hash := sha256.Sum256(buf)
if n.dupMessage[hash] && p.Type == protocol.Protocol_DATA_PACKET {
continue
}
n.mux.Lock()
n.dupMessage[hash] = true
n.mux.Unlock()
n.incoming <- &ProtoWConn{
Conn: conn,
Data: p,
}
}
}
func handleErr(err error) {
if err != nil {
fmt.Println("Error: ", err)
}
}
func (n *Node) getNeighboursOfNeighbour() {
n.outgoing <- &protocol.Protocol{
Type: protocol.Protocol_CONTROL_REQ,
ControlReq: &protocol.Protocol_ControlReq{
Type: protocol.Protocol_ControlReq_GET_NEIGHBOURS,
},
}
}
// Broadcast sends data to all the peers with log n messages in the network.
func (n *Node) Broadcast(msg string) {
n.outgoing <- &protocol.Protocol{
Type: protocol.Protocol_DATA_PACKET,
DataPacket: &protocol.Protocol_DataPacket{
Type: protocol.Protocol_DataPacket_BROADCAST,
Data: msg,
},
}
}
func main() {
nList := make([]*Node, 0)
universe := 15 // universe size
for i := 0; i < universe; i++ {
n := NewNode(i, universe)
nList = append(nList, n)
}
for _, n := range nList {
n.Start()
}
bcastInitPeer := 2 // This peer will start the broadcast.
bcastdata := "hello" // Broadcast message.
fmt.Println("Broadcast Initiated by: peer ", bcastInitPeer, " with data: ", bcastdata)
nList[bcastInitPeer].Broadcast(bcastdata)
time.Sleep(2 * time.Second)
// Node can also handle control signals or messages like quering connect peers.
fmt.Println("Sending GET_NEIGHBOUR request")
nList[bcastInitPeer].getNeighboursOfNeighbour()
time.Sleep(4 * time.Second)
}

Tree based distributed bcast across different nodes

go get github.com/upperwal/go-bcast

This program shows how a broadcast is implemented across different nodes in a distributed system which are arranged in a binary tree like topology.

Any node can initiate a broadcast or send a message to be broadcasted. The message will flow from top to bottom of the tree as shown in the diagram.

Main

Each node will only be responsible to send the message to it's 2 children who then will send it to it's children.

The algorithm will only take O(log n) steps instead of O(n) if a single nodes sends the message to everyone.

As the message flows only in one direction i.e. top to buttom. If node 4 broadcasts a message no node except 9 and 10 will receive the message.

To remove this constraint leaf nodes should be able to propagate the message to the root of the tree so that the message can reach every node.

This is done by connecting the last node of the tree (in this case, node 14) to the root node and all leaf nodes are connected to each other such that they form a binary tree to propagate messages in non-linear time/steps.

This is shown in the figure below.

Leaf

Some connections (11, 12, 13) are not shown intentionally.

As the last level can have upto O(n/2) (more than half of the nodes of the entire graph) nodes. Propagating messages in a linear manner would be expensive. So, all leaf nodes can also be arranged in a binary tree (no relation with the first tree) for faster message propagation. This will also help in getting the message to the last node (node 14) as it propagates the message to root node.

Note: Some nodes might get duplicate messages which are dropped.

Below diagram shows how different nodes initiates bcast and path followed by the message.

One

This one is a trivial case where root node starts the bcast and the message is send to everyone in O(log n) steps

Two

In this case a non-leaf node initiates the bcast. The message reaches some leaf node which sends it to the last node which pushes it to the root node from where it can follow the normal path.

Three

In this case the leftmost leaf node initiates the bcast. Imaging the number of nodes being thousands. A binary tree in the leaf level can help us optimise the flow of message to all the leaf nodes and ultimately to the root node.

syntax = "proto3";
package protocol;
message Protocol {
enum Type {
CONTROL_REQ = 0;
CONTROL_RES = 1;
DATA_PACKET = 2;
}
message ControlReq {
enum Type {
GET_NEIGHBOURS = 0;
}
Type type = 1;
}
message ControlRes {
int64 from = 1;
string data = 2;
}
message DataPacket {
enum Type {
BROADCAST = 0;
}
Type type = 1;
string data = 2;
}
Type type = 1;
DataPacket dataPacket = 2;
ControlReq controlReq = 3;
ControlRes controlRes = 4;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment