Skip to content

Instantly share code, notes, and snippets.

@kprotty
Last active March 20, 2024 16:11
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kprotty/5a41e9612657de00788478a7dde43d78 to your computer and use it in GitHub Desktop.
Save kprotty/5a41e9612657de00788478a7dde43d78 to your computer and use it in GitHub Desktop.
  • CPU: AMD Ryzen 5 2600 (6c, 12t)
  • RAM: 16gb DDR4 2933mhz
  • OS: Arch Linux Kernel 5.12.13
  • Command: wrk -c128 -t6 -d60 http://localhost:12345 --latency

Tokio

Running 30s test @ http://localhost:12345
  6 threads and 128 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   184.28us  218.44us  29.58ms   99.37%
    Req/Sec   112.76k     4.82k  126.20k    89.61%
  Latency Distribution
     50%  170.00us
     75%  211.00us
     90%  256.00us
     99%  365.00us
  20196282 requests in 30.06s, 1.67GB read
Requests/sec: 671923.50
Transfer/sec:     57.03MB

Golang

Running 30s test @ http://localhost:12345
  6 threads and 128 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   171.69us  458.50us  44.62ms   99.24%
    Req/Sec   132.66k     6.81k  146.83k    85.92%
  Latency Distribution
     50%  126.00us
     75%  197.00us
     90%  277.00us
     99%  523.00us
  23812653 requests in 30.10s, 1.97GB read
Requests/sec: 791098.35
Transfer/sec:     67.15MB

Epoll per thread

Running 30s test @ http://localhost:12345
  6 threads and 128 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency   125.36us  717.00us  44.57ms   99.36%
    Req/Sec   136.90k     7.22k  176.53k    70.44%
  Latency Distribution
     50%   77.00us
     75%   86.00us
     90%  145.00us
     99%  227.00us
  24561778 requests in 30.10s, 2.04GB read
Requests/sec: 816026.54
Transfer/sec:     69.26MB
const std = @import("std");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer std.debug.assert(gpa.deinit());
const allocator = &gpa.allocator;
const num_threads = std.math.max(1, (std.Thread.getCpuCount() catch 1) / 2);
const epolls = try allocator.alloc(std.os.fd_t, num_threads);
defer allocator.free(epolls);
var epolls_created: usize = 0;
defer for (epolls[0..epolls_created]) |epoll_fd|
std.os.close(epoll_fd);
for (epolls) |*epoll_fd| {
epoll_fd.* = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC);
epolls_created += 1;
}
var server_frame = async runServer(epolls, allocator);
defer (nosuspend await server_frame) catch {};
defer runEpoll(epolls[0]);
for (epolls[1..]) |epoll_fd| {
const thread = try std.Thread.spawn(.{}, runEpoll, .{epoll_fd});
thread.detach();
}
}
const AsyncFd = struct {
fd: std.os.fd_t,
epoll_fd: std.os.fd_t,
reader: usize = IS_EMPTY,
writer: usize = IS_EMPTY,
const IS_EMPTY = 0;
const IS_NOTIFIED = 1;
fn init(self: *AsyncFd, fd: std.os.fd_t, epoll_fd: std.os.fd_t) !void {
self.* = .{
.fd = fd,
.epoll_fd = epoll_fd,
};
try std.os.epoll_ctl(epoll_fd, std.os.EPOLL_CTL_ADD, fd, &std.os.epoll_event{
.data = .{ .ptr = @ptrToInt(self) },
.events = std.os.EPOLLIN | std.os.EPOLLOUT | std.os.EPOLLET | std.os.EPOLLRDHUP,
});
}
fn deinit(self: *AsyncFd) void {
std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_DEL, self.fd, null) catch unreachable;
}
fn waitOn(waiter_ptr: *usize) void {
defer waiter_ptr.* = IS_EMPTY;
if (waiter_ptr.* == IS_EMPTY) {
suspend waiter_ptr.* = @ptrToInt(@frame());
}
}
fn notify(waiter_ptr: *usize) bool {
const frame = waiter_ptr.*;
waiter_ptr.* = IS_NOTIFIED;
if (frame <= IS_NOTIFIED) return false;
resume @intToPtr(anyframe, frame);
return true;
}
};
fn runEpoll(epoll_fd: std.os.fd_t) void {
var events: [256]std.os.epoll_event = undefined;
while (true) {
const found = std.os.epoll_wait(epoll_fd, &events, -1);
for (events[0..found]) |ev| {
const async_fd = @intToPtr(*AsyncFd, ev.data.ptr);
var resumed = false;
if (!resumed and (ev.events & (std.os.EPOLLIN | std.os.EPOLLERR | std.os.EPOLLHUP | std.os.EPOLLRDHUP) != 0))
resumed = AsyncFd.notify(&async_fd.reader);
if (!resumed and (ev.events & (std.os.EPOLLOUT | std.os.EPOLLERR | std.os.EPOLLHUP) != 0))
resumed = AsyncFd.notify(&async_fd.writer);
}
}
}
fn runServer(epolls: []std.os.fd_t, allocator: *std.mem.Allocator) !void {
const sock_flags = std.os.SOCK_NONBLOCK | std.os.SOCK_CLOEXEC;
const server_fd = try std.os.socket(std.os.AF_INET, std.os.SOCK_STREAM | sock_flags, std.os.IPPROTO_TCP);
defer std.os.close(server_fd);
const port = 12345;
var addr = comptime std.net.Address.parseIp("127.0.0.1", port) catch unreachable;
try std.os.setsockopt(server_fd, std.os.SOL_SOCKET, std.os.SO_REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
try std.os.bind(server_fd, &addr.any, addr.getOsSockLen());
try std.os.listen(server_fd, 128);
var epoll_index: usize = 0;
var async_fd: AsyncFd = undefined;
try async_fd.init(server_fd, epolls[epoll_index]);
defer async_fd.deinit();
std.debug.warn("Listening on :{}\n", .{port});
while (true) {
const client_fd = std.os.accept(server_fd, null, null, sock_flags) catch |err| switch (err) {
else => |e| return e,
error.WouldBlock => {
AsyncFd.waitOn(&async_fd.reader);
continue;
},
};
if (allocator.create(@Frame(runClient))) |client_frame| {
client_frame.* = async runClient(client_fd, epolls[epoll_index], allocator);
epoll_index = (epoll_index + 1) % epolls.len;
} else |err| {
std.debug.warn("failed to create client coroutine for {}: {}\n", .{client_fd, err});
std.os.close(client_fd);
}
}
}
fn runClient(client_fd: std.os.fd_t, epoll_fd: std.os.fd_t, allocator: *std.mem.Allocator) !void {
defer {
std.os.close(client_fd);
suspend allocator.destroy(@frame());
}
const SOL_TCP = 6;
const TCP_NODELAY = 1;
try std.os.setsockopt(client_fd, SOL_TCP, TCP_NODELAY, &std.mem.toBytes(@as(c_int, 1)));
var async_fd: AsyncFd = undefined;
try async_fd.init(client_fd, epoll_fd);
defer async_fd.deinit();
var read_offset: usize = 0;
var read_buf: [4096]u8 = undefined;
while (true) {
const CLRF = "\r\n\r\n";
const req_buf = read_buf[0..read_offset];
if (std.mem.indexOf(u8, req_buf, CLRF)) |parsed| {
std.mem.copy(u8, &read_buf, req_buf[parsed + CLRF.len..]);
read_offset -= parsed + CLRF.len;
const RESP = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\n\r\nHelloWorld";
var write_offset: usize = 0;
while (write_offset < RESP.len) {
write_offset += std.os.send(client_fd, RESP[write_offset..], std.os.MSG_NOSIGNAL) catch |err| switch (err) {
else => |e| return e,
error.WouldBlock => {
AsyncFd.waitOn(&async_fd.writer);
continue;
},
};
}
continue;
}
while (true) {
const bytes = std.os.read(client_fd, read_buf[read_offset..]) catch |err| switch (err) {
else => |e| return e,
error.WouldBlock => {
AsyncFd.waitOn(&async_fd.reader);
continue;
},
};
if (bytes == 0) return;
read_offset += bytes;
break;
}
}
}
package main
import (
"bufio"
"bytes"
"log"
"net"
"runtime"
)
func main() {
runtime.GOMAXPROCS(runtime.GOMAXPROCS(0) / 2)
listener, err := net.Listen("tcp4", "localhost:12345");
if err != nil {
log.Fatal(err)
}
defer listener.Close()
log.Println("Listening on", listener.Addr())
server := listener.(*net.TCPListener)
for {
client, err := server.AcceptTCP();
if err != nil {
log.Fatal(err)
} else {
go runClient(client)
}
}
}
func runClient(client *net.TCPConn) {
defer client.Close()
if err := client.SetNoDelay(true); err != nil {
log.Println(err)
return
}
scanner := bufio.NewScanner(client)
scanner.Buffer(make([]byte, 4096), 4096)
scanner.Split(func (data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
} else if i := bytes.Index(data, []byte("\r\n\r\n")); i >= 0 {
return i + 4, data[:i], nil
} else if atEOF {
return len(data), data, nil
}
return 0, nil, nil
})
response := []byte("HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\n\r\nHelloWorld")
for scanner.Scan() {
if _, err := client.Write(response); err != nil {
return
}
}
}
// [dependencies]
// num_cpus = "1"
// tokio = { version = "1", features = ["net", "io-util", "rt-multi-thread"] }
//
// [profile.release]
// lto = true
// panic = "abort"
// codegen-units = 1
pub fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads((num_cpus::get() / 2).max(1))
.enable_io()
.build()?
.block_on(async {
let server = tokio::net::TcpListener::bind("127.0.0.1:12345").await.unwrap();
println!("Listening on :12345");
loop {
let (mut client, _) = server.accept().await.unwrap();
match client.set_nodelay(true) {
Ok(_) => {},
Err(_) => continue,
}
tokio::spawn(async move {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut read_offset = 0;
let mut read_buffer = [0; 4096];
const RESPONSE: &'static [u8] = b"HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\n\r\nHelloWorld";
loop {
let needle = b"\r\n\r\n";
while let Some(parsed) = read_buffer[..read_offset].windows(needle.len()).position(|window| window == needle) {
let parsed = parsed + needle.len();
read_buffer.copy_within(parsed.., 0);
read_offset -= parsed;
client.write_all(RESPONSE).await?;
}
match client.read(&mut read_buffer[read_offset..]).await {
Ok(n) if n != 0 => read_offset += n,
e => return e,
}
}
});
}
});
Ok(())
}
@lukechampine
Copy link

Slightly more idiomatic Go (using a chan int instead of a chan struct{} + atomic int):

package main

import (
	"bytes"
	"log"
	"net"
	"runtime"
)

func main() {
	runtime.GOMAXPROCS(6)

	l, err := net.Listen("tcp4", ":12345")
	if err != nil {
		log.Fatal(err)
	}
	defer l.Close()
	log.Println("Listening on", l.Addr())

	server := l.(*net.TCPListener)
	for {
		client, err := server.AcceptTCP()
		if err != nil {
			log.Fatal(err)
		} else if err := client.SetNoDelay(true); err != nil {
			log.Fatal(err)
		}

		requests := make(chan int, 1)
		go sender(client, requests)
		go receiver(client, requests)
	}
}

func sender(client *net.TCPConn, requests <-chan int) {
	response := []byte("HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\nDate: Thu, 19 Nov 2020 14:26:34 GMT\r\nServer: customgo\r\n\r\nHelloWorld")
	for reqs := range requests {
		for ; reqs > 0; reqs-- {
			if _, err := client.Write(response); err != nil {
				return
			}
		}
	}
}

func receiver(client *net.TCPConn, requests chan<- int) {
	defer close(requests)
	buffer := make([]byte, 4096)
	numReqs := 0
	offset := 0
	for {
		if i := bytes.Index(buffer[:offset], []byte("\r\n\r\n")); i != -1 {
			i += 4
			copy(buffer[i:], buffer[:i])
			offset -= i

			numReqs++
			select {
			case requests <- numReqs:
				numReqs = 0
			default:
			}

			continue
		}

		n, err := client.Read(buffer[offset:])
		if err != nil || n == 0 {
			break
		}
		offset += n
	}
}

I assume you want to keep the request parsing code equivalent between implementations, but if not, receiver can make use of bufio.Scanner:

func scanCRLF(data []byte, atEOF bool) (advance int, token []byte, err error) {
	if atEOF && len(data) == 0 {
		return 0, nil, nil
	} else if i := bytes.Index(data, []byte("\r\n\r\n")); i >= 0 {
		return i + 4, data[:i], nil
	} else if atEOF {
		return len(data), data, nil
	}
	return 0, nil, nil
}

func receiver(client *net.TCPConn, requests chan<- int) {
	defer close(requests)
	s := bufio.NewScanner(client)
	s.Buffer(make([]byte, 4096), 4096)
	s.Split(scanCRLF)
	numReqs := 0
	for s.Scan() {
		numReqs++
		select {
		case requests <- numReqs:
			numReqs = 0
		default:
		}
	}
}

Lastly, I noticed that each wrk2 connection only has one request in flight at a time, so there's no benefit to spawning separate sender and receiver goroutines (i.e. numReqs is never greater than 1); unsurprisingly, merging them into one serial loop significantly improves performance:

func runClient(client *net.TCPConn) {
	response := []byte("HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\nDate: Thu, 19 Nov 2020 14:26:34 GMT\r\nServer: customgo\r\n\r\nHelloWorld")
	s := bufio.NewScanner(client)
	s.Buffer(make([]byte, 4096), 4096)
	s.Split(scanCLRF)
	for s.Scan() {
		if _, err := client.Write(response); err != nil {
			return
		}
	}
}

Maybe I'm missing something though?

@kprotty
Copy link
Author

kprotty commented Jul 14, 2021

@lukechampine thanks for the feedback. I've incorporated your suggestions here and did notice a large throughput bump. Is this a solid change?

package main

import (
	"bufio"
	"bytes"
	"log"
	"net"
	"runtime"
)

func main() {
	runtime.GOMAXPROCS(runtime.GOMAXPROCS(0) / 2)

	listener, err := net.Listen("tcp4", "localhost:12345");
	if err != nil {
		log.Fatal(err)
	}

	defer listener.Close()
	log.Println("Listening on", listener.Addr())

	server := listener.(*net.TCPListener)
	for {
		client, err := server.AcceptTCP();
		if err != nil {
			log.Fatal(err)
		} else {
			go runClient(client)
		}
	}
}

func runClient(client *net.TCPConn) {
	defer client.Close()

	if err := client.SetNoDelay(true); err != nil {
		log.Println(err)
		return
	}

	reader := bufio.NewScanner(client)
	reader.Buffer(make([]byte, 4096), 4096)
	reader.Split(func (data []byte, atEOF bool) (advance int, token []byte, err error) {
		if atEOF && len(data) == 0 {
			return 0, nil, nil
		} else if i := bytes.Index(data, []byte("\r\n\r\n")); i >= 0 {
			return i + 4, data[:i], nil
		} else if atEOF {
			return len(data), data, nil
		}
		return 0, nil, nil
	})

	response := []byte("HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\n\r\nHelloWorld")
	for reader.Scan() {
		if _, err := client.Write(response); err != nil {
			return
		}
	}
}

@lukechampine
Copy link

Sure, looks good to me. Go tends to use shorter variable names (and it's a little odd to call a scanner a reader) but functionally it should be fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment