-
-
Save ayende/837553b6afe34730617f2487b757e31b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const builtin = @import("builtin"); | |
const os = std.os; | |
const mem = std.mem; | |
const testing = std.testing; | |
const IO_Uring = std.os.linux.IO_Uring; | |
pub const PagerRing = struct { | |
const IoRingQueueSize = 32; | |
const WorkReqStack = std.atomic.Stack(Work); | |
pub const CallbackFn = fn (*Work) void; | |
pub const Work = struct { | |
tag: enum { Read, Write }, | |
fd: std.os.fd_t, | |
buffer: []u8, | |
offset: u64, | |
context: u64, | |
result: struct { | |
bytes: u64, | |
err: ?i32, | |
}, | |
callback: CallbackFn, | |
}; | |
ring: IO_Uring, | |
allocator: std.mem.Allocator, | |
event_fd: i32, | |
worker: std.Thread, | |
background_error: ?anyerror, | |
pending: WorkReqStack, | |
running: bool, | |
pub fn init(allocator: std.mem.Allocator) !*PagerRing { | |
var self = try allocator.create(PagerRing); | |
errdefer allocator.destroy(self); | |
self.background_error = null; | |
self.allocator = allocator; | |
self.pending = WorkReqStack.init(); | |
self.ring = try IO_Uring.init(IoRingQueueSize, 0); | |
errdefer self.ring.deinit(); | |
self.event_fd = try std.os.eventfd(0, 0); | |
try self.ring.register_eventfd(self.event_fd); | |
self.running = true; | |
self.worker = try std.Thread.spawn(.{}, background_worker_wrapper, .{self}); | |
return self; | |
} | |
pub fn deinit(self: *PagerRing) void { | |
self.running = false; | |
@fence(.Acquire); | |
self.wake_worker() catch {}; | |
self.worker.join(); | |
self.ring.deinit(); | |
while (self.pending.pop()) |node| { | |
self.allocator.destroy(node); | |
} | |
std.os.close(self.event_fd); | |
self.allocator.destroy(self); | |
} | |
pub fn submit(self: *PagerRing, work: Work) !void { | |
if (self.background_error) |err| { | |
return err; | |
} | |
var ptr = try self.allocator.create(WorkReqStack.Node); | |
ptr.data = work; | |
self.pending.push(ptr); | |
try self.wake_worker(); | |
} | |
fn background_worker_wrapper(self: *PagerRing) void { | |
self.background_worker() catch |err| { | |
self.background_error = err; | |
while (self.pending.pop()) |node| { | |
defer self.allocator.destroy(node); | |
node.data.result.err = @errorToInt(err); | |
node.data.callback(&node.data); | |
} | |
}; | |
} | |
fn background_worker(self: *PagerRing) !void { | |
var cqes = std.mem.zeroes([IoRingQueueSize]std.os.linux.io_uring_cqe); | |
while (self.running) { | |
try self.wait_for_work(); | |
var shouldWake: bool = false; | |
while (self.pending.pop()) |node| { | |
const op = switch (node.data.tag) { | |
.Read => IO_Uring.read, | |
.Write => IO_Uring.write, | |
}; | |
_ = op( | |
&self.ring, | |
@ptrToInt(node), | |
node.data.fd, | |
node.data.buffer, | |
node.data.offset, | |
) catch |e| switch (e) { | |
error.SubmissionQueueFull => { | |
self.pending.push(node); | |
shouldWake = true; | |
break; | |
}, | |
else => { | |
self.allocator.destroy(node); | |
return e; | |
}, | |
}; | |
} | |
_ = self.ring.submit() catch |err| switch (err) { | |
error.CompletionQueueOvercommitted => shouldWake = true, | |
error.SignalInterrupt => shouldWake = true, | |
else => return err, | |
}; | |
// now let's process the completed values | |
const n = try self.ring.copy_cqes(cqes[0..], 0); | |
for (cqes[0..n]) |cqe| { | |
var node = @intToPtr(*WorkReqStack.Node, cqe.user_data); | |
if (cqe.res > 0 and cqe.res < node.data.buffer.len) { | |
// partial operation, need to resubmit | |
var bytes = @intCast(usize, cqe.res); | |
node.data.buffer = node.data.buffer[bytes..]; | |
node.data.offset += bytes; | |
node.data.result.bytes += bytes; | |
self.pending.push(node); | |
shouldWake = true; | |
continue; | |
} | |
defer self.allocator.destroy(node); | |
if (cqe.res < 0) { | |
node.data.result.err = -cqe.res; | |
} else { | |
node.data.result.bytes += @intCast(usize, cqe.res); | |
} | |
node.data.callback(&node.data); | |
} | |
if (shouldWake) { | |
try self.wake_worker(); | |
} | |
} | |
} | |
fn wake_worker(self: *PagerRing) !void { | |
var w: u64 = 1; | |
_ = try std.os.write(self.event_fd, std.mem.asBytes(&w)); | |
} | |
fn wait_for_work(self: *PagerRing) !void { | |
var val: u64 = undefined; | |
_ = try std.os.read(self.event_fd, std.mem.asBytes(&val)); | |
} | |
}; | |
const OpResult = struct { waiter: std.atomic.Atomic(u32), bytes: u64, err: ?i32 }; | |
fn wake(work: *PagerRing.Work) void { | |
var result = @intToPtr(*OpResult, work.context); | |
result.bytes = work.result.bytes; | |
result.err = work.result.err; | |
result.waiter.store(1, .SeqCst); | |
std.Thread.Futex.wake(&result.waiter, std.math.maxInt(u32)); | |
} | |
test "can read and write values from ring" { | |
var file = try std.fs.createFileAbsolute("/tmp/file-ring-test", .{ .truncate = true, .read = true }); | |
defer file.close(); | |
var msg = "hello world\n"; | |
try file.writeAll(msg); | |
var ring = try PagerRing.init(testing.allocator); | |
defer ring.deinit(); | |
var buffer: [32]u8 = undefined; | |
var waiter: OpResult = std.mem.zeroInit(OpResult, .{}); | |
try ring.submit(.{ | |
.tag = .Read, | |
.buffer = &buffer, | |
.offset = 0, | |
.fd = file.handle, | |
.context = @ptrToInt(&waiter), | |
.callback = wake, | |
.result = .{ .bytes = 0, .err = null }, | |
}); | |
try std.Thread.Futex.wait(&waiter.waiter, 0, null); | |
try testing.expectEqual(msg.len, waiter.bytes); | |
try testing.expect(null == waiter.err); | |
try testing.expectEqualStrings(msg, buffer[0..waiter.bytes]); | |
waiter.waiter.storeUnchecked(0); | |
var msg2 = "testing things out..."; | |
std.mem.copy(u8, &buffer, msg2); | |
try ring.submit(.{ | |
.tag = .Write, | |
.buffer = buffer[0..msg2.len], | |
.offset = 0, | |
.fd = file.handle, | |
.context = @ptrToInt(&waiter), | |
.callback = wake, | |
.result = .{ .bytes = 0, .err = null }, | |
}); | |
try std.Thread.Futex.wait(&waiter.waiter, 0, null); | |
try testing.expectEqual(msg2.len, waiter.bytes); | |
try testing.expect(null == waiter.err); | |
std.mem.set(u8, &buffer, 0); | |
waiter.waiter.storeUnchecked(0); | |
try ring.submit(.{ | |
.tag = .Read, | |
.buffer = &buffer, | |
.offset = 0, | |
.fd = file.handle, | |
.context = @ptrToInt(&waiter), | |
.callback = wake, | |
.result = .{ .bytes = 0, .err = null }, | |
}); | |
try std.Thread.Futex.wait(&waiter.waiter, 0, null); | |
try testing.expectEqual(msg2.len, waiter.bytes); | |
try testing.expect(null == waiter.err); | |
try testing.expectEqualStrings(msg2, buffer[0..waiter.bytes]); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment