Skip to content

Instantly share code, notes, and snippets.

@wirepair
Last active May 14, 2017 08:22
Show Gist options
  • Save wirepair/c0db3d67e599a5a0cc8fdbda47f6dc8f to your computer and use it in GitHub Desktop.
Save wirepair/c0db3d67e599a5a0cc8fdbda47f6dc8f to your computer and use it in GitHub Desktop.
A udp client & server where clients send packets at 10hz. can this be optimized further?
// An attempt at an optimized udp client/server implementation that has clients sending 10pps.
// run the server: go build && main -server -num 5000
// run the client: go build && main -num 5000
// i was only able to get 9000 clients sending for 30 seconds with 0 packet loss in windows
// after that i started get drops
//
// author: isaac dawson @ https://twitter.com/_wirepair
package main
import (
"encoding/binary"
"errors"
"flag"
"github.com/pkg/profile"
"log"
"math/rand"
"net"
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)
const (
MAX_PACKET_BYTES = 1220
SOCKET_BUF_SIZE = 1024 * 1024
)
var serverMode bool
var maxClients int
var runTime float64
var runProfile bool
var randomizeStart bool
var clientTotalSend uint64
var clientTotalRecv uint64
var addr = &net.UDPAddr{IP: net.ParseIP("::1"), Port: 40000}
func init() {
flag.BoolVar(&serverMode, "server", false, "pass this flag to run the server")
flag.BoolVar(&runProfile, "prof", false, "pass this flag to enable profiling")
flag.BoolVar(&randomizeStart, "rand", false, "pass this flag to randomize client startups")
flag.IntVar(&maxClients, "num", 64, "number of clients to serve or to create")
flag.Float64Var(&runTime, "runtime", 5.0, "how long to run clients for/clear client buffer in seconds")
}
// our struct for passing data and client addresses around
type netcodeData struct {
data []byte
from *net.UDPAddr
}
// allows for supporting custom handlers
type NetcodeRecvHandler func(data *netcodeData)
type NetcodeConn struct {
conn *net.UDPConn // the underlying connection
closeCh chan struct{} // used for closing the connection/signaling
isClosed bool // is this connection open/closed?
maxBytes int // maximum allowed bytes for read/write
xmitBuf sync.Pool // re-use recv buf to reduce allocs
recvSize int
sendSize int
recvHandlerFn NetcodeRecvHandler // allow custom recv handlers
}
// Creates a new netcode connection
func NewNetcodeConn() *NetcodeConn {
c := &NetcodeConn{}
c.closeCh = make(chan struct{})
c.isClosed = true
c.maxBytes = MAX_PACKET_BYTES
return c
}
// set a custom recv handler (must be called before Dial or Listen)
func (c *NetcodeConn) SetRecvHandler(recvHandlerFn NetcodeRecvHandler) {
c.recvHandlerFn = recvHandlerFn
}
// Write to the connection
func (c *NetcodeConn) Write(b []byte) (int, error) {
if c.isClosed {
return -1, errors.New("unable to write, socket has been closed")
}
return c.conn.Write(b)
}
// Write to an address (only usable via Listen)
func (c *NetcodeConn) WriteTo(b []byte, to *net.UDPAddr) (int, error) {
if c.isClosed {
return -1, errors.New("unable to write, socket has been closed")
}
return c.conn.WriteToUDP(b, to)
}
// Shutdown time.
func (c *NetcodeConn) Close() error {
if !c.isClosed {
close(c.closeCh)
}
c.isClosed = true
return c.conn.Close()
}
// Dial the server
func (c *NetcodeConn) Dial(address *net.UDPAddr) error {
var err error
if c.recvHandlerFn == nil {
return errors.New("packet handler must be set before calling listen")
}
c.closeCh = make(chan struct{})
c.conn, err = net.DialUDP(address.Network(), nil, address)
if err != nil {
return err
}
c.sendSize = SOCKET_BUF_SIZE
c.recvSize = SOCKET_BUF_SIZE
c.create()
return nil
}
// Listen for connections on address
func (c *NetcodeConn) Listen(address *net.UDPAddr) error {
var err error
if c.recvHandlerFn == nil {
return errors.New("packet handler must be set before calling listen")
}
c.conn, err = net.ListenUDP(address.Network(), address)
if err != nil {
return err
}
c.sendSize = SOCKET_BUF_SIZE * maxClients
c.recvSize = SOCKET_BUF_SIZE * maxClients
c.create()
return nil
}
// setup xmit buffer pool, socket read/write sizes and kick off readloop
func (c *NetcodeConn) create() {
c.isClosed = false
c.xmitBuf.New = func() interface{} {
return make([]byte, c.maxBytes)
}
c.conn.SetReadBuffer(c.recvSize)
c.conn.SetWriteBuffer(c.sendSize)
go c.readLoop()
}
// read blocks, so this must be called from a go routine
func (c *NetcodeConn) receiver(ch chan *netcodeData) {
for {
if err := c.read(); err == nil {
select {
case <-c.closeCh:
return
default:
}
} else {
log.Printf("error reading data from socket: %s\n", err)
}
}
}
// read does the actual connection read call, verifies we have a
// buffer > 0 and < maxBytes before we bother to attempt to actually
// dispatch it to the recvHandlerFn.
func (c *NetcodeConn) read() error {
var n int
var from *net.UDPAddr
var err error
data := c.xmitBuf.Get().([]byte)[:c.maxBytes]
n, from, err = c.conn.ReadFromUDP(data)
if err != nil {
return err
}
if n == 0 {
return errors.New("socket error: 0 byte length recv'd")
}
if n > c.maxBytes {
return errors.New("packet size was > maxBytes")
}
netData := &netcodeData{}
netData.data = data[:n]
netData.from = from
go c.recvHandlerFn(netData)
return nil
}
// dispatch the netcodeData to the bound recvHandler function.
func (c *NetcodeConn) readLoop() {
dataCh := make(chan *netcodeData)
c.receiver(dataCh)
<-c.closeCh
}
func main() {
flag.Parse()
buf := make([]byte, MAX_PACKET_BYTES)
for i := 0; i < len(buf); i += 1 {
buf[i] = byte(i)
}
if runProfile {
p := profile.Start(profile.CPUProfile, profile.ProfilePath("."), profile.NoShutdownHook)
defer p.Stop()
}
if serverMode {
runServer(buf)
return
}
wg := &sync.WaitGroup{}
for i := 0; i < maxClients; i += 1 {
wg.Add(1)
go runClient(wg, buf, i)
}
wg.Wait()
log.Printf("client total send/recv: %d/%d\n", clientTotalSend, clientTotalRecv)
}
func runServer(buf []byte) {
conn := NewNetcodeConn()
recvCount := make([]uint64, maxClients)
// set our recv handler to just get client ids, increment and spew a buffer back to client
conn.SetRecvHandler(func(data *netcodeData) {
// obviously this is dumb and you'd never use userinput to index into a slice, but, testing.
clientId := binary.LittleEndian.Uint16(data.data)
atomic.AddUint64(&recvCount[clientId], 1)
conn.WriteTo(buf, data.from)
})
if err := conn.Listen(addr); err != nil {
log.Fatalf("error in listen: %s\n", err)
}
log.Printf("listening on %s\n", addr.String())
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
// wait for the good ol' ctrl+c
<-c
total := uint64(0)
for i := 0; i < maxClients; i += 1 {
log.Printf("clientId: %d recv'd/sent %d", i, recvCount[i])
total += recvCount[i]
}
log.Printf("\ntotal: %d\n", total)
conn.Close()
}
// run our client, sending packets at 10z
func runClient(wg *sync.WaitGroup, buf []byte, index int) {
clientBuf := make([]byte, len(buf))
copy(clientBuf, buf)
binary.LittleEndian.PutUint16(clientBuf[:2], uint16(index))
doneTimer := time.NewTimer(time.Duration(runTime * float64(time.Second)))
sendPacket := time.NewTicker(100 * time.Millisecond) // 10hz
sendCount := uint64(0)
recvCount := uint64(0)
conn := NewNetcodeConn()
conn.SetRecvHandler(func(data *netcodeData) {
atomic.AddUint64(&recvCount, 1)
})
// randomize start up of clients
if randomizeStart {
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
if err := conn.Dial(addr); err != nil {
log.Fatalf("error connecting to %s\n", err)
}
for {
select {
// time to send the packets!
case <-sendPacket.C:
if _, err := conn.Write(clientBuf); err != nil {
log.Fatalf("error sending packets: %s\n", err)
}
atomic.AddUint64(&sendCount, 1)
case <-doneTimer.C:
sendPacket.Stop()
doneTimer.Stop()
time.Sleep(500 * time.Millisecond)
rxcnt := atomic.LoadUint64(&recvCount)
txcnt := atomic.LoadUint64(&sendCount)
log.Printf("client: %d recv'd: %d sent: %d\n", index, rxcnt, txcnt)
atomic.AddUint64(&clientTotalRecv, rxcnt)
atomic.AddUint64(&clientTotalSend, txcnt)
wg.Done()
return
}
}
}
@gafferongames
Copy link

Try increasing SOCKET_BUF_SIZE? I've found it needs to be proportional to the # of clients on the server, but the client can have a smaller buffer size.

@wirepair
Copy link
Author

@gafferongames, wow yeah i multiplied SOCKET_BUF_SIZE by # of clients and i can now get about 9000 clients with 0 packet loss!

@gafferongames
Copy link

BAM!

@bschwind
Copy link

bschwind commented May 12, 2017

@wirepair I came across this on twitter and thought it was good example app to replicate to learn Rust/Tokio. I'm still smoothing out some things and figuring out the best way to do things, but here's what I have:

https://gist.github.com/bschwind/c2d9ab615a78f6370890f31f061b1a01

Repo here: https://github.com/bschwind/udp-stress-test/tree/master

You can run the server with cargo run --release -- -s. Currently the server is single-threaded...I'm curious to see how it performs on your machine with your go clients running against it. I'm on a 2014 Macbook now and don't have a lot of RAM to spare, I was getting no buffer space available errors at ~4000-5000 clients.

Sorry, the config for client count and duration don't have command line args yet, they're defined near the top of the file. I also need to add a Ctrl-C handler to stop the server and print the statistics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment