Skip to content

Instantly share code, notes, and snippets.

@lnsp
Last active March 9, 2020 22:59
Show Gist options
  • Save lnsp/055944bb5a7c8818f3b0d27a76679602 to your computer and use it in GitHub Desktop.
Save lnsp/055944bb5a7c8818f3b0d27a76679602 to your computer and use it in GitHub Desktop.
package main
import (
"bufio"
"bytes"
"encoding/binary"
"flag"
"fmt"
"net"
"os"
)
var (
pid = flag.Int("pid", 0, "unique process ID")
size = flag.Int("size", 1, "number of peers")
addr = flag.String("addr", "224.0.0.1:5000", "address to listen on")
)
func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
func run() error {
flag.Parse()
brd := &Broadcaster{Process: uint16(*pid)}
go func() {
if err := brd.Listen(); err != nil {
fmt.Fprintf(os.Stderr, "listen: %v\n", err)
os.Exit(1)
}
}()
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
line := scanner.Text()
if err := brd.Send([]byte(line)); err != nil {
fmt.Fprintf(os.Stderr, "send: %v\n", err)
}
}
return nil
}
type Broadcaster struct {
addr *net.UDPAddr
conn *net.UDPConn
queue []Msg
acks []Msg
time uint64
Process uint16
}
func NewBroadcaster(pid uint16) *Broadcaster {
return &Broadcaster{
Process: pid,
}
}
func (brd *Broadcaster) log(format string, args ...interface{}) {
str := fmt.Sprintf(format, args...)
fmt.Fprintf(os.Stderr, "[%06x] %s\n", brd.time, str)
}
func (brd *Broadcaster) Listen() error {
var err error
brd.addr, err = net.ResolveUDPAddr("udp", *addr)
if err != nil {
return fmt.Errorf("failed to resolve addr: %v", err)
}
brd.conn, err = net.ListenMulticastUDP("udp", nil, brd.addr)
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}
brd.log("listen: on addr %s", brd.addr)
for {
buf := make([]byte, 1024)
n, err := brd.conn.Read(buf)
if err != nil {
return fmt.Errorf("failed to receive: %v", err)
}
if err := brd.Handle(buf[:n]); err != nil {
brd.log("handle: %v", err)
}
}
}
func (brd *Broadcaster) Handle(raw []byte) error {
var msg Msg
if err := Unmarshal(raw, &msg); err != nil {
return fmt.Errorf("failed to unmarshal: %v", err)
}
brd.log("handle: packet from %d", msg.Process)
if msg.Ack {
brd.acks = append(brd.acks, msg)
} else {
brd.time = max(brd.time, msg.Time) + 1
brd.queue = append(brd.queue, msg)
ack := Msg{
Ack: true,
Process: brd.Process,
Time: brd.time,
Content: raw,
}
if err := brd.broadcast(ack); err != nil {
return fmt.Errorf("failed to broadcast: %v", err)
}
}
for i := 0; i < len(brd.queue); i++ {
var (
oldestMsg = brd.queue[i]
rawOldestMsg = oldestMsg.Marshal()
)
acks := make(map[uint16]bool)
for _, ack := range brd.acks {
if bytes.Equal(ack.Content, rawOldestMsg) {
acks[ack.Process] = true
}
}
ackByAll := true
for i := 0; i < *size && ackByAll; i++ {
if !acks[uint16(i)] {
brd.log("handle: %d has not ack yet", i)
ackByAll = false
}
}
if !ackByAll {
continue
}
brd.queue = brd.queue[i+1:]
fmt.Fprintf(os.Stdout, "[%06x] ack: %s\n", brd.time, string(oldestMsg.Content))
return nil
}
return nil
}
func (brd *Broadcaster) Send(content []byte) error {
brd.time++
msg := Msg{
Process: uint16(*pid),
Time: brd.time,
Content: content,
}
return brd.broadcast(msg)
}
func (brd *Broadcaster) broadcast(msg Msg) error {
raw := msg.Marshal()
conn, err := net.DialUDP("udp", nil, brd.addr)
if err != nil {
return fmt.Errorf("failed to dial: %v", err)
}
defer conn.Close()
if _, err := conn.Write(raw); err != nil {
return fmt.Errorf("failed to broadcast: %v", err)
}
if msg.Ack {
brd.log("broadcast: ack of size %d", len(raw))
} else {
brd.log("broadcast: msg of size %d", len(raw))
}
return nil
}
type Msg struct {
Ack bool
Process uint16
Time uint64
Content []byte
}
func (msg Msg) Marshal() []byte {
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, msg.Ack)
binary.Write(buf, binary.BigEndian, msg.Process)
binary.Write(buf, binary.BigEndian, msg.Time)
buf.Write(msg.Content)
return buf.Bytes()
}
func Unmarshal(data []byte, msg *Msg) error {
if len(data) < 7 {
return fmt.Errorf("buf is too small")
}
buf := bytes.NewBuffer(data)
binary.Read(buf, binary.BigEndian, &msg.Ack)
binary.Read(buf, binary.BigEndian, &msg.Process)
binary.Read(buf, binary.BigEndian, &msg.Time)
msg.Content = buf.Bytes()
return nil
}
func max(a, b uint64) uint64 {
if a > b {
return a
}
return b
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment