Skip to content

Instantly share code, notes, and snippets.

@PirosB3
Created February 15, 2016 22:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PirosB3/eaebfbb8feee3ddb1aaf to your computer and use it in GitHub Desktop.
Save PirosB3/eaebfbb8feee3ddb1aaf to your computer and use it in GitHub Desktop.
package main
import (
cryptorand "crypto/rand"
"math/rand"
"errors"
"sync"
"fmt"
"time"
)
const (
GetCurrentCounter = iota
ReceiveCurrentCounter
Ping
SetNewCounter
ReceiveSetNewCounterConfirm
)
func pseudo_uuid() (uuid string) {
b := make([]byte, 16)
_, err := cryptorand.Read(b)
if err != nil {
fmt.Println("Error: ", err)
return
}
return fmt.Sprintf("%X-%X-%X-%X-%X", b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
type Command struct {
Type int
Payload int
From, To string
}
type Node struct {
Id string
Current, CandidateNodeConfirms, PotentialNewId int
OthersCurrent map[string]int
OthersCurrentLock, CandidateNodeConfirmsLock sync.Mutex
Incoming chan Command
Outgoing map[string](chan Command)
}
func Connect(n1, n2 *Node) {
n1.Outgoing[n2.Id] = n2.Incoming
n2.Outgoing[n1.Id] = n1.Incoming
}
func NewNode() *Node {
return &Node{
Id: pseudo_uuid(),
Current: 1,
CandidateNodeConfirms: 0,
OthersCurrent: make(map[string]int),
Incoming: make(chan Command),
Outgoing: make(map[string](chan Command)),
}
}
func (n *Node) Run() {
go func(n *Node) {
for {
select {
case data := <- n.Incoming:
switch data.Type {
case GetCurrentCounter:
c := Command{Type: ReceiveCurrentCounter, From: n.Id, To: data.From, Payload: n.Current}
if rand.Int31n(100) < 40 {
n.Outgoing[data.From] <- c
}
case ReceiveCurrentCounter:
n.OthersCurrentLock.Lock()
n.OthersCurrent[data.From] = data.Payload
n.OthersCurrentLock.Unlock()
case SetNewCounter:
n.CandidateNodeConfirmsLock.Lock()
if n.Current < data.Payload {
n.Current = data.Payload
c := Command{
Type: ReceiveSetNewCounterConfirm,
From: n.Id, To: data.From, Payload: n.Current,
}
if rand.Int31n(100) < 50 {
n.Outgoing[data.From] <- c
}
} else {
fmt.Println("ERROR:", n.Id, "has bigger ID than", data.From)
}
n.CandidateNodeConfirmsLock.Unlock()
case ReceiveSetNewCounterConfirm:
n.CandidateNodeConfirmsLock.Lock()
if n.PotentialNewId == data.Payload {
n.CandidateNodeConfirms++
fmt.Println(n.Id, ": Received confirm from", data.From)
} else {
fmt.Println("ERROR:", n.Id, "node confirm is not for current ID")
}
n.CandidateNodeConfirmsLock.Unlock()
}
}
}
}(n)
}
func (n *Node) GetNextId() (int, error) {
currentBiggest, err := n.ReadGlobalId()
if err != nil {
return -1, err
}
nextCandidateId := currentBiggest + 1
if n.BroadcastNewCounter(nextCandidateId) {
n.Current = nextCandidateId
return n.Current, nil
} else {
return -1, errors.New("Count not reach consensus")
}
}
func (n *Node) BroadcastNewCounter(NewCounter int) bool {
n.CandidateNodeConfirmsLock.Lock()
n.CandidateNodeConfirms = 0
n.PotentialNewId = NewCounter
for address := range n.Outgoing {
c := Command{Type: SetNewCounter, From: n.Id, To: address, Payload: NewCounter}
n.Outgoing[address] <- c
}
n.CandidateNodeConfirmsLock.Unlock()
// Wait 300MS
<- time.After(time.Millisecond * 300)
// Ensure at least half agree
n.CandidateNodeConfirmsLock.Lock()
defer n.CandidateNodeConfirmsLock.Unlock()
if n.CandidateNodeConfirms >= (len(n.Outgoing)/2)+1 {
return true
} else {
return false
}
}
func (n *Node) ReadGlobalId() (int, error) {
// Request new entries
n.OthersCurrentLock.Lock()
n.OthersCurrent = make(map[string]int)
for address := range n.Outgoing {
c := Command{Type: GetCurrentCounter, From: n.Id, To: address}
n.Outgoing[address] <- c
}
n.OthersCurrentLock.Unlock()
n.OthersCurrent[n.Id] = n.Current
// Wait 300MS
<- time.After(time.Millisecond * 10)
// Build list
n.OthersCurrentLock.Lock()
defer n.OthersCurrentLock.Unlock()
total_seen := len(n.OthersCurrent)
if total_seen < (len(n.Outgoing) / 2) + 1 {
return -1, errors.New("Count not reach consensus")
}
biggest := -1
for k := range n.OthersCurrent {
item := n.OthersCurrent[k]
if item > biggest {
biggest = item
}
}
return biggest, nil
}
func main() {
var wg sync.WaitGroup
nodes := make([]*Node, 10)
for i:=0; i < 10; i++ {
n := NewNode()
n.Run()
nodes[i] = n
}
for i:=0; i < 10; i++ {
for j := i+1; j < 10; j++ {
Connect(nodes[i], nodes[j])
}
}
wg.Add(1)
for i := 0; i < 2000; i++ {
idx := rand.Int() % len(nodes)
node := nodes[idx]
value, err := node.GetNextId()
if err == nil {
fmt.Println("New ID is:", value)
} else {
fmt.Println(err)
}
//<- time.After(time.Millisecond * time.Duration(rand.Int63n(2)))
}
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment