Skip to content

Instantly share code, notes, and snippets.

@nicerobot
Last active August 29, 2015 13:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nicerobot/9753246 to your computer and use it in GitHub Desktop.
Save nicerobot/9753246 to your computer and use it in GitHub Desktop.
Golang Pastry

Based on the main README and the docs, i wrote what i thought should have been a fairly simple peer that i can test on a single machine using multiple ports.

The first peer is started with just a single port, that being the "cluster-port", that is, it's used to identify the cluster subsequent nodes will Join.

Subsequent peers start by specifying two ports, the first being the new node's port, the second being the "cluster-port".

Messages are created from the intial node every second and the message-id is simply a cycle of (currently 10) static values.

This example will launch five processes and watch the output of each node for 120 seconds (it seems at times it takes a long time for nodes to join.)

curl -ks https://gist.githubusercontent.com/nicerobot/9753246/raw/example.sh | bash -s 120 5

#!/bin/bash
go get github.com/secondbit/wendy || exit ${LINENO}
[ -f peer-rand.go ] || curl -ksO https://gist.githubusercontent.com/nicerobot/9753246/raw/peer-rand.go
rm 80*.out
go build peer-rand.go && {
./peer-rand 8080 >8080.out 2>&1 & pids=(${pids[*]} $!)
for i in 1 2 3 4; do
./peer-rand 808${i} 8080 >808${i}.out 2>&1 & pids=(${pids[*]} $!)
done
}
trap "kill ${pids[*]}; exit 1" 0 1 2 3
tail -f 80*.out & pids=(${pids[*]} $!)
sleep ${1:-120}
for p in ${pids[*]}; do sleep ${2:-5}; echo "kill ${p}"; kill ${p}; done
package main
import (
"fmt"
"crypto/sha256"
"io"
"github.com/secondbit/wendy"
"math/rand"
"os"
"strconv"
"time"
"log"
)
type applicationCallback struct{}
func (app *applicationCallback) OnError(err error) {
log.Printf("\nERROR %v %v\n\n", app, err)
}
func (app *applicationCallback) OnDeliver(msg wendy.Message) {
log.Printf("Delivered %v %v %v %v\n", msg.Purpose, msg.Sender, msg.Key, string(msg.Value[:]))
}
func (app *applicationCallback) OnForward(msg *wendy.Message, nextId wendy.NodeID) bool {
log.Printf("Forward %v %v\n", msg, nextId)
return true
}
func (app *applicationCallback) OnNewLeaves(leafset []*wendy.Node) {
log.Printf("New %v\n", leafset)
}
func (app *applicationCallback) OnNodeJoin(node wendy.Node) {
log.Printf("Join %v\n", node)
}
func (app *applicationCallback) OnNodeExit(node wendy.Node) {
log.Printf("Exit %v\n", node)
}
func (app *applicationCallback) OnHeartbeat(node wendy.Node) {
log.Printf("Beat %v\n", node)
}
func main() {
// Most of this is taken directly from the README (with typos corrected)
// The port for _this_ node. I do this instead of using a random value so as to know the _cluster_ port.
this_port, err := strconv.Atoi(os.Args[1])
if err != nil {
log.Println(err)
os.Exit(2)
}
hostname, err := os.Hostname()
if err != nil {
panic(err.Error())
}
id_string := fmt.Sprintf("%v %v", this_port, hostname)
log.Println(id_string)
id, err := wendy.NodeIDFromBytes([]byte(id_string))
if err != nil {
panic(err.Error())
}
node := wendy.NewNode(id, "127.0.0.1", "127.0.0.1", "none", this_port)
credentials := wendy.Passphrase("I <3 Gophers.")
cluster := wendy.NewCluster(node, credentials)
cluster.RegisterCallback(&applicationCallback{})
go func() {
defer cluster.Stop()
log.Println("Listening")
err := cluster.Listen()
if err != nil {
panic(err.Error())
}
}()
// If there are two parameters, join the cluster.
if len(os.Args) > 2 {
cluster_port, err := strconv.Atoi(os.Args[2])
if err == nil {
log.Printf("cluster port %v\n", cluster_port)
cluster.Join("127.0.0.1", cluster_port)
} else {
log.Println(err)
os.Exit(2)
}
// Otherwise, send a random message to the cluster every 5 seconds.
} else {
ticker := time.NewTicker(1 * time.Second)
quit := make(chan struct{})
go func() {
bn := 0
for {
select {
case <-ticker.C:
h := sha256.New()
io.WriteString(h, fmt.Sprintf("%d", bn % 10))
bn += 1
id, err := wendy.NodeIDFromBytes(h.Sum(nil))
if err != nil {
panic(err.Error())
}
purpose := byte(16)
m := fmt.Sprintf("%v %v", rand.Int63(), int32(time.Now().Unix()))
log.Println(m)
msg := cluster.NewMessage(purpose, id, []byte(m))
err = cluster.Send(msg)
if err != nil {
log.Println(err.Error())
}
case <-quit:
ticker.Stop()
return
}
}
}()
}
select {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment