Skip to content

Instantly share code, notes, and snippets.

@ayende
Created December 31, 2021 13:01
Show Gist options
  • Save ayende/837553b6afe34730617f2487b757e31b to your computer and use it in GitHub Desktop.
Save ayende/837553b6afe34730617f2487b757e31b to your computer and use it in GitHub Desktop.
const std = @import("std");
const builtin = @import("builtin");
const os = std.os;
const mem = std.mem;
const testing = std.testing;
const IO_Uring = std.os.linux.IO_Uring;
pub const PagerRing = struct {
const IoRingQueueSize = 32;
const WorkReqStack = std.atomic.Stack(Work);
pub const CallbackFn = fn (*Work) void;
pub const Work = struct {
tag: enum { Read, Write },
fd: std.os.fd_t,
buffer: []u8,
offset: u64,
context: u64,
result: struct {
bytes: u64,
err: ?i32,
},
callback: CallbackFn,
};
ring: IO_Uring,
allocator: std.mem.Allocator,
event_fd: i32,
worker: std.Thread,
background_error: ?anyerror,
pending: WorkReqStack,
running: bool,
pub fn init(allocator: std.mem.Allocator) !*PagerRing {
var self = try allocator.create(PagerRing);
errdefer allocator.destroy(self);
self.background_error = null;
self.allocator = allocator;
self.pending = WorkReqStack.init();
self.ring = try IO_Uring.init(IoRingQueueSize, 0);
errdefer self.ring.deinit();
self.event_fd = try std.os.eventfd(0, 0);
try self.ring.register_eventfd(self.event_fd);
self.running = true;
self.worker = try std.Thread.spawn(.{}, background_worker_wrapper, .{self});
return self;
}
pub fn deinit(self: *PagerRing) void {
self.running = false;
@fence(.Acquire);
self.wake_worker() catch {};
self.worker.join();
self.ring.deinit();
while (self.pending.pop()) |node| {
self.allocator.destroy(node);
}
std.os.close(self.event_fd);
self.allocator.destroy(self);
}
pub fn submit(self: *PagerRing, work: Work) !void {
if (self.background_error) |err| {
return err;
}
var ptr = try self.allocator.create(WorkReqStack.Node);
ptr.data = work;
self.pending.push(ptr);
try self.wake_worker();
}
fn background_worker_wrapper(self: *PagerRing) void {
self.background_worker() catch |err| {
self.background_error = err;
while (self.pending.pop()) |node| {
defer self.allocator.destroy(node);
node.data.result.err = @errorToInt(err);
node.data.callback(&node.data);
}
};
}
fn background_worker(self: *PagerRing) !void {
var cqes = std.mem.zeroes([IoRingQueueSize]std.os.linux.io_uring_cqe);
while (self.running) {
try self.wait_for_work();
var shouldWake: bool = false;
while (self.pending.pop()) |node| {
const op = switch (node.data.tag) {
.Read => IO_Uring.read,
.Write => IO_Uring.write,
};
_ = op(
&self.ring,
@ptrToInt(node),
node.data.fd,
node.data.buffer,
node.data.offset,
) catch |e| switch (e) {
error.SubmissionQueueFull => {
self.pending.push(node);
shouldWake = true;
break;
},
else => {
self.allocator.destroy(node);
return e;
},
};
}
_ = self.ring.submit() catch |err| switch (err) {
error.CompletionQueueOvercommitted => shouldWake = true,
error.SignalInterrupt => shouldWake = true,
else => return err,
};
// now let's process the completed values
const n = try self.ring.copy_cqes(cqes[0..], 0);
for (cqes[0..n]) |cqe| {
var node = @intToPtr(*WorkReqStack.Node, cqe.user_data);
if (cqe.res > 0 and cqe.res < node.data.buffer.len) {
// partial operation, need to resubmit
var bytes = @intCast(usize, cqe.res);
node.data.buffer = node.data.buffer[bytes..];
node.data.offset += bytes;
node.data.result.bytes += bytes;
self.pending.push(node);
shouldWake = true;
continue;
}
defer self.allocator.destroy(node);
if (cqe.res < 0) {
node.data.result.err = -cqe.res;
} else {
node.data.result.bytes += @intCast(usize, cqe.res);
}
node.data.callback(&node.data);
}
if (shouldWake) {
try self.wake_worker();
}
}
}
fn wake_worker(self: *PagerRing) !void {
var w: u64 = 1;
_ = try std.os.write(self.event_fd, std.mem.asBytes(&w));
}
fn wait_for_work(self: *PagerRing) !void {
var val: u64 = undefined;
_ = try std.os.read(self.event_fd, std.mem.asBytes(&val));
}
};
const OpResult = struct { waiter: std.atomic.Atomic(u32), bytes: u64, err: ?i32 };
fn wake(work: *PagerRing.Work) void {
var result = @intToPtr(*OpResult, work.context);
result.bytes = work.result.bytes;
result.err = work.result.err;
result.waiter.store(1, .SeqCst);
std.Thread.Futex.wake(&result.waiter, std.math.maxInt(u32));
}
test "can read and write values from ring" {
var file = try std.fs.createFileAbsolute("/tmp/file-ring-test", .{ .truncate = true, .read = true });
defer file.close();
var msg = "hello world\n";
try file.writeAll(msg);
var ring = try PagerRing.init(testing.allocator);
defer ring.deinit();
var buffer: [32]u8 = undefined;
var waiter: OpResult = std.mem.zeroInit(OpResult, .{});
try ring.submit(.{
.tag = .Read,
.buffer = &buffer,
.offset = 0,
.fd = file.handle,
.context = @ptrToInt(&waiter),
.callback = wake,
.result = .{ .bytes = 0, .err = null },
});
try std.Thread.Futex.wait(&waiter.waiter, 0, null);
try testing.expectEqual(msg.len, waiter.bytes);
try testing.expect(null == waiter.err);
try testing.expectEqualStrings(msg, buffer[0..waiter.bytes]);
waiter.waiter.storeUnchecked(0);
var msg2 = "testing things out...";
std.mem.copy(u8, &buffer, msg2);
try ring.submit(.{
.tag = .Write,
.buffer = buffer[0..msg2.len],
.offset = 0,
.fd = file.handle,
.context = @ptrToInt(&waiter),
.callback = wake,
.result = .{ .bytes = 0, .err = null },
});
try std.Thread.Futex.wait(&waiter.waiter, 0, null);
try testing.expectEqual(msg2.len, waiter.bytes);
try testing.expect(null == waiter.err);
std.mem.set(u8, &buffer, 0);
waiter.waiter.storeUnchecked(0);
try ring.submit(.{
.tag = .Read,
.buffer = &buffer,
.offset = 0,
.fd = file.handle,
.context = @ptrToInt(&waiter),
.callback = wake,
.result = .{ .bytes = 0, .err = null },
});
try std.Thread.Futex.wait(&waiter.waiter, 0, null);
try testing.expectEqual(msg2.len, waiter.bytes);
try testing.expect(null == waiter.err);
try testing.expectEqualStrings(msg2, buffer[0..waiter.bytes]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment