Skip to content

Instantly share code, notes, and snippets.

@d4l3k
Last active February 5, 2019 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d4l3k/d3e6d40495ff8bcc6e58515892816352 to your computer and use it in GitHub Desktop.
Save d4l3k/d3e6d40495ff8bcc6e58515892816352 to your computer and use it in GitHub Desktop.
ZMQ TCP Push/Pull message drop on tcp reset
tristanr@tristanr-arch ~/D/zmqtest> go build -v .; and sudo ./zmqtest
2019/02/05 11:55:44 zmqtest.go:93: sent 0
2019/02/05 11:55:44 zmqtest.go:124: received 0
2019/02/05 11:55:44 zmqtest.go:93: sent 100000
2019/02/05 11:55:44 zmqtest.go:124: received 100000
2019/02/05 11:55:44 zmqtest.go:93: sent 200000
2019/02/05 11:55:44 zmqtest.go:93: sent 300000
2019/02/05 11:55:44 zmqtest.go:124: received 200000
2019/02/05 11:55:44 zmqtest.go:93: sent 400000
2019/02/05 11:55:44 zmqtest.go:93: sent 500000
2019/02/05 11:55:44 zmqtest.go:124: received 300000
2019/02/05 11:55:44 zmqtest.go:93: sent 600000
2019/02/05 11:55:44 zmqtest.go:93: sent 700000
2019/02/05 11:55:44 zmqtest.go:124: received 400000
2019/02/05 11:55:44 zmqtest.go:93: sent 800000
2019/02/05 11:55:44 zmqtest.go:93: sent 900000
2019/02/05 11:55:44 zmqtest.go:124: received 500000
2019/02/05 11:55:44 zmqtest.go:93: sent 1000000
2019/02/05 11:55:44 zmqtest.go:93: sent 1100000
2019/02/05 11:55:44 zmqtest.go:124: received 600000
2019/02/05 11:55:44 zmqtest.go:93: sent 1200000
2019/02/05 11:55:45 zmqtest.go:124: received 700000
2019/02/05 11:55:45 zmqtest.go:93: sent 1300000
2019/02/05 11:55:45 zmqtest.go:93: sent 1400000
2019/02/05 11:55:45 zmqtest.go:124: received 800000
2019/02/05 11:55:45 zmqtest.go:93: sent 1500000
2019/02/05 11:55:45 zmqtest.go:124: received 900000
2019/02/05 11:55:45 zmqtest.go:93: sent 1600000
2019/02/05 11:55:45 zmqtest.go:93: sent 1700000
2019/02/05 11:55:45 zmqtest.go:124: received 1000000
2019/02/05 11:55:45 zmqtest.go:93: sent 1800000
2019/02/05 11:55:45 zmqtest.go:124: received 1100000
2019/02/05 11:55:45 zmqtest.go:93: sent 1900000
2019/02/05 11:55:45 zmqtest.go:93: sent 2000000
2019/02/05 11:55:45 zmqtest.go:135: blocked
2019/02/05 11:55:45 zmqtest.go:93: sent 2100000
2019/02/05 11:55:45 zmqtest.go:93: sent 2200000
2019/02/05 11:55:46 zmqtest.go:141: unblocked
2019/02/05 11:55:46 zmqtest.go:121: got 2284500; expected 1183880
package main
import (
"context"
"fmt"
"log"
"net"
"os/exec"
"strconv"
"sync"
"time"
"github.com/pebbe/zmq4"
)
func main() {
log.SetFlags(log.Flags() | log.Lshortfile)
if err := run(); err != nil {
log.Fatalf("%+v", err)
}
}
// Get preferred outbound ip of this machine
func GetOutboundIP() net.IP {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP
}
func cmd(name string, args ...string) {
if err := exec.Command(name, args...).Run(); err != nil {
log.Fatal(err)
}
}
func run() error {
zmq, err := zmq4.NewContext()
if err != nil {
return err
}
if zmq.SetIpv6(true); err != nil {
return err
}
push, err := zmq.NewSocket(zmq4.PUSH)
if err != nil {
return err
}
ip := GetOutboundIP().String()
endpoint := fmt.Sprintf("tcp://%s:5333", ip)
if err := push.Bind(endpoint); err != nil {
return err
}
pull, err := zmq.NewSocket(zmq4.PULL)
if err != nil {
return err
}
if err := pull.Connect(endpoint); err != nil {
return err
}
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
// pusher
wg.Add(1)
go func() {
defer wg.Done()
var i int
loop:
for {
if err := ctx.Err(); err != nil {
break
}
if _, err := push.Send(strconv.Itoa(i), 0); err == zmq4.ErrorSocketClosed {
break loop
} else if err != nil {
log.Fatal(err)
}
if i%100000 == 0 {
log.Printf("sent %d", i)
}
i++
}
log.Printf("send %d", i)
}()
// puller
wg.Add(1)
go func() {
defer wg.Done()
var i int
for {
if err := ctx.Err(); err != nil {
break
}
data, err := pull.Recv(zmq4.DONTWAIT)
if err != nil && err.Error() == "resource temporarily unavailable" {
continue
} else if err != nil {
log.Fatal(err)
}
got, err := strconv.Atoi(data)
if err != nil {
log.Fatal(err)
}
if got != i {
log.Fatalf("got %d; expected %d", got, i)
}
if i%100000 == 0 {
log.Printf("received %d", i)
}
i++
}
log.Printf("received %d", i)
}()
time.Sleep(1 * time.Second)
cmd("iptables", "-I", "OUTPUT", "-p", "TCP", "--dport", "5333", "-j", "REJECT", "--reject-with", "tcp-reset")
cmd("iptables", "-I", "INPUT", "-p", "TCP", "--dport", "5333", "-j", "REJECT", "--reject-with", "tcp-reset")
log.Printf("blocked")
time.Sleep(1 * time.Second)
cmd("iptables", "-F")
cmd("iptables", "-X")
log.Printf("unblocked")
time.Sleep(1 * time.Second)
cancel()
if err := push.Close(); err != nil {
return err
}
log.Printf("waiting")
wg.Wait()
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment