|
const std = @import("std"); |
|
|
|
pub fn main() !void { |
|
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; |
|
defer std.debug.assert(gpa.deinit()); |
|
const allocator = &gpa.allocator; |
|
|
|
const num_threads = std.math.max(1, (std.Thread.getCpuCount() catch 1) / 2); |
|
const epolls = try allocator.alloc(std.os.fd_t, num_threads); |
|
defer allocator.free(epolls); |
|
|
|
var epolls_created: usize = 0; |
|
defer for (epolls[0..epolls_created]) |epoll_fd| |
|
std.os.close(epoll_fd); |
|
for (epolls) |*epoll_fd| { |
|
epoll_fd.* = try std.os.epoll_create1(std.os.EPOLL_CLOEXEC); |
|
epolls_created += 1; |
|
} |
|
|
|
var server_frame = async runServer(epolls, allocator); |
|
defer (nosuspend await server_frame) catch {}; |
|
|
|
defer runEpoll(epolls[0]); |
|
for (epolls[1..]) |epoll_fd| { |
|
const thread = try std.Thread.spawn(.{}, runEpoll, .{epoll_fd}); |
|
thread.detach(); |
|
} |
|
} |
|
|
|
const AsyncFd = struct { |
|
fd: std.os.fd_t, |
|
epoll_fd: std.os.fd_t, |
|
reader: usize = IS_EMPTY, |
|
writer: usize = IS_EMPTY, |
|
|
|
const IS_EMPTY = 0; |
|
const IS_NOTIFIED = 1; |
|
|
|
fn init(self: *AsyncFd, fd: std.os.fd_t, epoll_fd: std.os.fd_t) !void { |
|
self.* = .{ |
|
.fd = fd, |
|
.epoll_fd = epoll_fd, |
|
}; |
|
|
|
try std.os.epoll_ctl(epoll_fd, std.os.EPOLL_CTL_ADD, fd, &std.os.epoll_event{ |
|
.data = .{ .ptr = @ptrToInt(self) }, |
|
.events = std.os.EPOLLIN | std.os.EPOLLOUT | std.os.EPOLLET | std.os.EPOLLRDHUP, |
|
}); |
|
} |
|
|
|
fn deinit(self: *AsyncFd) void { |
|
std.os.epoll_ctl(self.epoll_fd, std.os.EPOLL_CTL_DEL, self.fd, null) catch unreachable; |
|
} |
|
|
|
fn waitOn(waiter_ptr: *usize) void { |
|
defer waiter_ptr.* = IS_EMPTY; |
|
if (waiter_ptr.* == IS_EMPTY) { |
|
suspend waiter_ptr.* = @ptrToInt(@frame()); |
|
} |
|
} |
|
|
|
fn notify(waiter_ptr: *usize) bool { |
|
const frame = waiter_ptr.*; |
|
waiter_ptr.* = IS_NOTIFIED; |
|
if (frame <= IS_NOTIFIED) return false; |
|
resume @intToPtr(anyframe, frame); |
|
return true; |
|
} |
|
}; |
|
|
|
fn runEpoll(epoll_fd: std.os.fd_t) void { |
|
var events: [256]std.os.epoll_event = undefined; |
|
while (true) { |
|
const found = std.os.epoll_wait(epoll_fd, &events, -1); |
|
for (events[0..found]) |ev| { |
|
const async_fd = @intToPtr(*AsyncFd, ev.data.ptr); |
|
|
|
var resumed = false; |
|
if (!resumed and (ev.events & (std.os.EPOLLIN | std.os.EPOLLERR | std.os.EPOLLHUP | std.os.EPOLLRDHUP) != 0)) |
|
resumed = AsyncFd.notify(&async_fd.reader); |
|
if (!resumed and (ev.events & (std.os.EPOLLOUT | std.os.EPOLLERR | std.os.EPOLLHUP) != 0)) |
|
resumed = AsyncFd.notify(&async_fd.writer); |
|
} |
|
} |
|
} |
|
|
|
fn runServer(epolls: []std.os.fd_t, allocator: *std.mem.Allocator) !void { |
|
const sock_flags = std.os.SOCK_NONBLOCK | std.os.SOCK_CLOEXEC; |
|
const server_fd = try std.os.socket(std.os.AF_INET, std.os.SOCK_STREAM | sock_flags, std.os.IPPROTO_TCP); |
|
defer std.os.close(server_fd); |
|
|
|
const port = 12345; |
|
var addr = comptime std.net.Address.parseIp("127.0.0.1", port) catch unreachable; |
|
try std.os.setsockopt(server_fd, std.os.SOL_SOCKET, std.os.SO_REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); |
|
try std.os.bind(server_fd, &addr.any, addr.getOsSockLen()); |
|
try std.os.listen(server_fd, 128); |
|
|
|
var epoll_index: usize = 0; |
|
var async_fd: AsyncFd = undefined; |
|
try async_fd.init(server_fd, epolls[epoll_index]); |
|
defer async_fd.deinit(); |
|
|
|
std.debug.warn("Listening on :{}\n", .{port}); |
|
while (true) { |
|
const client_fd = std.os.accept(server_fd, null, null, sock_flags) catch |err| switch (err) { |
|
else => |e| return e, |
|
error.WouldBlock => { |
|
AsyncFd.waitOn(&async_fd.reader); |
|
continue; |
|
}, |
|
}; |
|
|
|
if (allocator.create(@Frame(runClient))) |client_frame| { |
|
client_frame.* = async runClient(client_fd, epolls[epoll_index], allocator); |
|
epoll_index = (epoll_index + 1) % epolls.len; |
|
} else |err| { |
|
std.debug.warn("failed to create client coroutine for {}: {}\n", .{client_fd, err}); |
|
std.os.close(client_fd); |
|
} |
|
} |
|
} |
|
|
|
fn runClient(client_fd: std.os.fd_t, epoll_fd: std.os.fd_t, allocator: *std.mem.Allocator) !void { |
|
defer { |
|
std.os.close(client_fd); |
|
suspend allocator.destroy(@frame()); |
|
} |
|
|
|
const SOL_TCP = 6; |
|
const TCP_NODELAY = 1; |
|
try std.os.setsockopt(client_fd, SOL_TCP, TCP_NODELAY, &std.mem.toBytes(@as(c_int, 1))); |
|
|
|
var async_fd: AsyncFd = undefined; |
|
try async_fd.init(client_fd, epoll_fd); |
|
defer async_fd.deinit(); |
|
|
|
var read_offset: usize = 0; |
|
var read_buf: [4096]u8 = undefined; |
|
while (true) { |
|
|
|
const CLRF = "\r\n\r\n"; |
|
const req_buf = read_buf[0..read_offset]; |
|
if (std.mem.indexOf(u8, req_buf, CLRF)) |parsed| { |
|
std.mem.copy(u8, &read_buf, req_buf[parsed + CLRF.len..]); |
|
read_offset -= parsed + CLRF.len; |
|
|
|
const RESP = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\nContent-Type: text/plain; charset=utf8\r\n\r\nHelloWorld"; |
|
var write_offset: usize = 0; |
|
while (write_offset < RESP.len) { |
|
write_offset += std.os.send(client_fd, RESP[write_offset..], std.os.MSG_NOSIGNAL) catch |err| switch (err) { |
|
else => |e| return e, |
|
error.WouldBlock => { |
|
AsyncFd.waitOn(&async_fd.writer); |
|
continue; |
|
}, |
|
}; |
|
} |
|
|
|
continue; |
|
} |
|
|
|
while (true) { |
|
const bytes = std.os.read(client_fd, read_buf[read_offset..]) catch |err| switch (err) { |
|
else => |e| return e, |
|
error.WouldBlock => { |
|
AsyncFd.waitOn(&async_fd.reader); |
|
continue; |
|
}, |
|
}; |
|
|
|
if (bytes == 0) return; |
|
read_offset += bytes; |
|
break; |
|
} |
|
} |
|
} |
Slightly more idiomatic Go (using a
chan int
instead of achan struct{}
+ atomic int):I assume you want to keep the request parsing code equivalent between implementations, but if not,
receiver
can make use ofbufio.Scanner
:Lastly, I noticed that each
wrk2
connection only has one request in flight at a time, so there's no benefit to spawning separatesender
andreceiver
goroutines (i.e.numReqs
is never greater than 1); unsurprisingly, merging them into one serial loop significantly improves performance:Maybe I'm missing something though?