Last active
February 5, 2019 19:57
-
-
Save d4l3k/d3e6d40495ff8bcc6e58515892816352 to your computer and use it in GitHub Desktop.
ZMQ TCP Push/Pull message drop on tcp reset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
tristanr@tristanr-arch ~/D/zmqtest> go build -v .; and sudo ./zmqtest | |
2019/02/05 11:55:44 zmqtest.go:93: sent 0 | |
2019/02/05 11:55:44 zmqtest.go:124: received 0 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 100000 | |
2019/02/05 11:55:44 zmqtest.go:124: received 100000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 200000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 300000 | |
2019/02/05 11:55:44 zmqtest.go:124: received 200000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 400000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 500000 | |
2019/02/05 11:55:44 zmqtest.go:124: received 300000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 600000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 700000 | |
2019/02/05 11:55:44 zmqtest.go:124: received 400000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 800000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 900000 | |
2019/02/05 11:55:44 zmqtest.go:124: received 500000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 1000000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 1100000 | |
2019/02/05 11:55:44 zmqtest.go:124: received 600000 | |
2019/02/05 11:55:44 zmqtest.go:93: sent 1200000 | |
2019/02/05 11:55:45 zmqtest.go:124: received 700000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1300000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1400000 | |
2019/02/05 11:55:45 zmqtest.go:124: received 800000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1500000 | |
2019/02/05 11:55:45 zmqtest.go:124: received 900000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1600000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1700000 | |
2019/02/05 11:55:45 zmqtest.go:124: received 1000000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1800000 | |
2019/02/05 11:55:45 zmqtest.go:124: received 1100000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 1900000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 2000000 | |
2019/02/05 11:55:45 zmqtest.go:135: blocked | |
2019/02/05 11:55:45 zmqtest.go:93: sent 2100000 | |
2019/02/05 11:55:45 zmqtest.go:93: sent 2200000 | |
2019/02/05 11:55:46 zmqtest.go:141: unblocked | |
2019/02/05 11:55:46 zmqtest.go:121: got 2284500; expected 1183880 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"net" | |
"os/exec" | |
"strconv" | |
"sync" | |
"time" | |
"github.com/pebbe/zmq4" | |
) | |
func main() { | |
log.SetFlags(log.Flags() | log.Lshortfile) | |
if err := run(); err != nil { | |
log.Fatalf("%+v", err) | |
} | |
} | |
// Get preferred outbound ip of this machine | |
func GetOutboundIP() net.IP { | |
conn, err := net.Dial("udp", "8.8.8.8:80") | |
if err != nil { | |
log.Fatal(err) | |
} | |
defer conn.Close() | |
localAddr := conn.LocalAddr().(*net.UDPAddr) | |
return localAddr.IP | |
} | |
func cmd(name string, args ...string) { | |
if err := exec.Command(name, args...).Run(); err != nil { | |
log.Fatal(err) | |
} | |
} | |
func run() error { | |
zmq, err := zmq4.NewContext() | |
if err != nil { | |
return err | |
} | |
if zmq.SetIpv6(true); err != nil { | |
return err | |
} | |
push, err := zmq.NewSocket(zmq4.PUSH) | |
if err != nil { | |
return err | |
} | |
ip := GetOutboundIP().String() | |
endpoint := fmt.Sprintf("tcp://%s:5333", ip) | |
if err := push.Bind(endpoint); err != nil { | |
return err | |
} | |
pull, err := zmq.NewSocket(zmq4.PULL) | |
if err != nil { | |
return err | |
} | |
if err := pull.Connect(endpoint); err != nil { | |
return err | |
} | |
var wg sync.WaitGroup | |
ctx, cancel := context.WithCancel(context.Background()) | |
// pusher | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
var i int | |
loop: | |
for { | |
if err := ctx.Err(); err != nil { | |
break | |
} | |
if _, err := push.Send(strconv.Itoa(i), 0); err == zmq4.ErrorSocketClosed { | |
break loop | |
} else if err != nil { | |
log.Fatal(err) | |
} | |
if i%100000 == 0 { | |
log.Printf("sent %d", i) | |
} | |
i++ | |
} | |
log.Printf("send %d", i) | |
}() | |
// puller | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
var i int | |
for { | |
if err := ctx.Err(); err != nil { | |
break | |
} | |
data, err := pull.Recv(zmq4.DONTWAIT) | |
if err != nil && err.Error() == "resource temporarily unavailable" { | |
continue | |
} else if err != nil { | |
log.Fatal(err) | |
} | |
got, err := strconv.Atoi(data) | |
if err != nil { | |
log.Fatal(err) | |
} | |
if got != i { | |
log.Fatalf("got %d; expected %d", got, i) | |
} | |
if i%100000 == 0 { | |
log.Printf("received %d", i) | |
} | |
i++ | |
} | |
log.Printf("received %d", i) | |
}() | |
time.Sleep(1 * time.Second) | |
cmd("iptables", "-I", "OUTPUT", "-p", "TCP", "--dport", "5333", "-j", "REJECT", "--reject-with", "tcp-reset") | |
cmd("iptables", "-I", "INPUT", "-p", "TCP", "--dport", "5333", "-j", "REJECT", "--reject-with", "tcp-reset") | |
log.Printf("blocked") | |
time.Sleep(1 * time.Second) | |
cmd("iptables", "-F") | |
cmd("iptables", "-X") | |
log.Printf("unblocked") | |
time.Sleep(1 * time.Second) | |
cancel() | |
if err := push.Close(); err != nil { | |
return err | |
} | |
log.Printf("waiting") | |
wg.Wait() | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment