Last active
March 9, 2020 22:59
-
-
Save lnsp/055944bb5a7c8818f3b0d27a76679602 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"bufio" | |
"bytes" | |
"encoding/binary" | |
"flag" | |
"fmt" | |
"net" | |
"os" | |
) | |
var ( | |
pid = flag.Int("pid", 0, "unique process ID") | |
size = flag.Int("size", 1, "number of peers") | |
addr = flag.String("addr", "224.0.0.1:5000", "address to listen on") | |
) | |
func main() { | |
if err := run(); err != nil { | |
fmt.Fprintf(os.Stderr, "error: %v\n", err) | |
os.Exit(1) | |
} | |
} | |
func run() error { | |
flag.Parse() | |
brd := &Broadcaster{Process: uint16(*pid)} | |
go func() { | |
if err := brd.Listen(); err != nil { | |
fmt.Fprintf(os.Stderr, "listen: %v\n", err) | |
os.Exit(1) | |
} | |
}() | |
scanner := bufio.NewScanner(os.Stdin) | |
for scanner.Scan() { | |
line := scanner.Text() | |
if err := brd.Send([]byte(line)); err != nil { | |
fmt.Fprintf(os.Stderr, "send: %v\n", err) | |
} | |
} | |
return nil | |
} | |
type Broadcaster struct { | |
addr *net.UDPAddr | |
conn *net.UDPConn | |
queue []Msg | |
acks []Msg | |
time uint64 | |
Process uint16 | |
} | |
func NewBroadcaster(pid uint16) *Broadcaster { | |
return &Broadcaster{ | |
Process: pid, | |
} | |
} | |
func (brd *Broadcaster) log(format string, args ...interface{}) { | |
str := fmt.Sprintf(format, args...) | |
fmt.Fprintf(os.Stderr, "[%06x] %s\n", brd.time, str) | |
} | |
func (brd *Broadcaster) Listen() error { | |
var err error | |
brd.addr, err = net.ResolveUDPAddr("udp", *addr) | |
if err != nil { | |
return fmt.Errorf("failed to resolve addr: %v", err) | |
} | |
brd.conn, err = net.ListenMulticastUDP("udp", nil, brd.addr) | |
if err != nil { | |
return fmt.Errorf("failed to listen: %v", err) | |
} | |
brd.log("listen: on addr %s", brd.addr) | |
for { | |
buf := make([]byte, 1024) | |
n, err := brd.conn.Read(buf) | |
if err != nil { | |
return fmt.Errorf("failed to receive: %v", err) | |
} | |
if err := brd.Handle(buf[:n]); err != nil { | |
brd.log("handle: %v", err) | |
} | |
} | |
} | |
func (brd *Broadcaster) Handle(raw []byte) error { | |
var msg Msg | |
if err := Unmarshal(raw, &msg); err != nil { | |
return fmt.Errorf("failed to unmarshal: %v", err) | |
} | |
brd.log("handle: packet from %d", msg.Process) | |
if msg.Ack { | |
brd.acks = append(brd.acks, msg) | |
} else { | |
brd.time = max(brd.time, msg.Time) + 1 | |
brd.queue = append(brd.queue, msg) | |
ack := Msg{ | |
Ack: true, | |
Process: brd.Process, | |
Time: brd.time, | |
Content: raw, | |
} | |
if err := brd.broadcast(ack); err != nil { | |
return fmt.Errorf("failed to broadcast: %v", err) | |
} | |
} | |
for i := 0; i < len(brd.queue); i++ { | |
var ( | |
oldestMsg = brd.queue[i] | |
rawOldestMsg = oldestMsg.Marshal() | |
) | |
acks := make(map[uint16]bool) | |
for _, ack := range brd.acks { | |
if bytes.Equal(ack.Content, rawOldestMsg) { | |
acks[ack.Process] = true | |
} | |
} | |
ackByAll := true | |
for i := 0; i < *size && ackByAll; i++ { | |
if !acks[uint16(i)] { | |
brd.log("handle: %d has not ack yet", i) | |
ackByAll = false | |
} | |
} | |
if !ackByAll { | |
continue | |
} | |
brd.queue = brd.queue[i+1:] | |
fmt.Fprintf(os.Stdout, "[%06x] ack: %s\n", brd.time, string(oldestMsg.Content)) | |
return nil | |
} | |
return nil | |
} | |
func (brd *Broadcaster) Send(content []byte) error { | |
brd.time++ | |
msg := Msg{ | |
Process: uint16(*pid), | |
Time: brd.time, | |
Content: content, | |
} | |
return brd.broadcast(msg) | |
} | |
func (brd *Broadcaster) broadcast(msg Msg) error { | |
raw := msg.Marshal() | |
conn, err := net.DialUDP("udp", nil, brd.addr) | |
if err != nil { | |
return fmt.Errorf("failed to dial: %v", err) | |
} | |
defer conn.Close() | |
if _, err := conn.Write(raw); err != nil { | |
return fmt.Errorf("failed to broadcast: %v", err) | |
} | |
if msg.Ack { | |
brd.log("broadcast: ack of size %d", len(raw)) | |
} else { | |
brd.log("broadcast: msg of size %d", len(raw)) | |
} | |
return nil | |
} | |
type Msg struct { | |
Ack bool | |
Process uint16 | |
Time uint64 | |
Content []byte | |
} | |
func (msg Msg) Marshal() []byte { | |
buf := new(bytes.Buffer) | |
binary.Write(buf, binary.BigEndian, msg.Ack) | |
binary.Write(buf, binary.BigEndian, msg.Process) | |
binary.Write(buf, binary.BigEndian, msg.Time) | |
buf.Write(msg.Content) | |
return buf.Bytes() | |
} | |
func Unmarshal(data []byte, msg *Msg) error { | |
if len(data) < 7 { | |
return fmt.Errorf("buf is too small") | |
} | |
buf := bytes.NewBuffer(data) | |
binary.Read(buf, binary.BigEndian, &msg.Ack) | |
binary.Read(buf, binary.BigEndian, &msg.Process) | |
binary.Read(buf, binary.BigEndian, &msg.Time) | |
msg.Content = buf.Bytes() | |
return nil | |
} | |
func max(a, b uint64) uint64 { | |
if a > b { | |
return a | |
} | |
return b | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment