Skip to content

Instantly share code, notes, and snippets.

@ayende
Last active Dec 30, 2021
Embed
What would you like to do?
const std = @import("std");
const builtin = @import("builtin");
const os = std.os;
const mem = std.mem;
const testing = std.testing;
pub const Page = struct {
page: u64,
numberOfPages: u32,
buffer: []u8,
};
pub const FileChunks = struct {
pub const MaxFileSize = 8 * 1024 * 1024 * 1024; // 8 GB
pub const ChunkSize = 2 * 1024 * 1024; // 2 MB
pub const MaxChunksInFile = MaxFileSize / ChunkSize; // 4096
pub const PageSize = 8 * 1024; // 8 KB
pub const PagesInChunk = ChunkSize / PageSize; // 256
pub const ChunkMetadata = packed union {
pub const Tag = enum(u2) {
Empty = 0b00,
Error = 0b01,
Loading = 0b10,
Value = 0b11,
};
raw: u64,
futex: packed struct {
value: u32, // tag & version
references: u32, // ignored
pub fn val(self: *@This()) *std.atomic.Atomic(u32) {
return @ptrCast(*std.atomic.Atomic(u32), @alignCast(@alignOf(u32), &self.value));
}
},
v: packed struct {
version: u30,
tag: Tag,
// references == 0 - this is unused
// references == 1 - just the pager is holding this
// refereces >= 2 - external entity is holding this
references: u32,
},
pub fn getTag(self: *ChunkMetadata) Tag {
return self.v.tag;
}
pub fn setTag(self: *ChunkMetadata, tag: Tag) void {
self.v.version +%= 1;
self.v.tag = tag;
}
pub fn release(self: *ChunkMetadata) !void {
self.v.version +%= 1;
if (self.v.references == 0) {
return error.ChunkReferenceCountUnderfow;
}
self.v.references -= 1;
}
pub fn addRef(self: *ChunkMetadata) !void {
comptime var maxVal = std.math.maxInt(@TypeOf(self.v.references));
if (@intCast(u64, self.v.references) + 1 > maxVal) {
return error.ChunkReferenceCountOverflow;
}
self.v.references += 1;
self.v.version +%= 1;
}
pub fn wait(self: *ChunkMetadata, version: ChunkMetadata, timeout: ?u64) !void {
try std.Thread.Futex.wait(self.futex.val(), version.futex.value, timeout);
}
pub fn tryUpdate(self: *ChunkMetadata, origin: ChunkMetadata, newVal: ChunkMetadata) bool {
if (@cmpxchgWeak(u64, &self.raw, origin.raw, newVal.raw, .Monotonic, .Monotonic) == null) {
std.Thread.Futex.wake(self.futex.val(), std.math.maxInt(u32));
return true;
}
return false;
}
pub fn mayReclaim(self: *ChunkMetadata) bool {
return self.v.references == 1 and self.v.tag == .Value;
}
pub fn reset(self: *ChunkMetadata, tag: Tag) ChunkMetadata {
return ChunkMetadata{ .v = .{
.tag = tag,
.version = self.v.version +% 1,
.references = 0,
} };
}
};
comptime {
if (@sizeOf(ChunkMetadata) != @sizeOf(u64)) {
@compileError("ChunkMetadata should be exactly 64 bits in length");
}
}
chunks: [MaxChunksInFile]ChunkMetadata,
ptr: []align(mem.page_size) u8,
allocator: mem.Allocator,
pub fn init(allocator: mem.Allocator) !*FileChunks {
var self = try allocator.create(FileChunks);
errdefer allocator.destroy(self);
self.allocator = allocator;
mem.set(ChunkMetadata, &self.chunks, .{ .raw = 0 });
self.ptr = try os.mmap(
null,
MaxFileSize,
os.PROT.NONE,
os.MAP.ANONYMOUS | os.MAP.PRIVATE,
-1,
0,
);
errdefer os.munmap(self.ptr);
return self;
}
pub fn deinit(self: *FileChunks) void {
os.munmap(self.ptr);
self.allocator.destroy(self);
}
fn tryClaimOwnership(self: *FileChunks, index: u64) ?ChunkMetadata {
while (true) {
var copy = self.chunks[index];
if (copy.mayReclaim() == false)
return null;
var modified = copy.reset(.Loading); // means that we own it for the duration...
if (self.chunks[index].tryUpdate(copy, modified))
return modified;
}
}
fn trySetToEmpty(self: *FileChunks, index: u64) !void {
while (true) {
var modified = self.chunks[index];
if (modified.getTag() != .Loading) {
// someone else modified it while we where releasing the memory
return error.ValueIsNotLoading;
}
var released = modified.reset(.Empty);
if (self.chunks[index].tryUpdate(modified, released))
break;
}
}
pub fn reclaim(self: *FileChunks) !u64 {
var reclaimed: u64 = 0;
var index: u64 = 0;
while (index < self.chunks.len) : (index += 1) {
var modified: ChunkMetadata = undefined;
if (self.tryClaimOwnership(index)) |m| {
// at this point, m is owned by us, and no one else can use it...
modified = m;
} else {
continue;
}
reclaimed += ChunkSize;
_ = try os.mmap(
self.getLoadedChunk(index).ptr,
ChunkSize,
os.PROT.NONE,
os.MAP.ANONYMOUS | os.MAP.PRIVATE | os.MAP.FIXED,
-1,
0,
);
try self.trySetToEmpty(index);
}
return reclaimed;
}
fn getLoadedChunk(self: *FileChunks, chunk: u64) []align(mem.page_size) u8 {
var offset = chunk * ChunkSize;
return self.ptr[offset..(offset + ChunkSize)];
}
pub fn tryGet(self: *FileChunks, chunk: u64) !?[]align(mem.page_size) u8 {
while (true) {
var copy = self.chunks[chunk];
var origin = copy;
switch (copy.getTag()) {
.Empty => return null,
.Error => return @intToError(@intCast(u16, copy.v.references)),
.Loading => return error.ValueIsLoading,
.Value => {},
}
try copy.addRef();
if (self.chunks[chunk].tryUpdate(origin, copy)) {
return self.getLoadedChunk(chunk);
}
}
}
pub fn getBlocking(self: *FileChunks, chunk: u64, timeout: ?u64) ![]align(mem.page_size) u8 {
while (true) {
var maybechunk = self.tryGet(chunk) catch |e| {
if (e == error.ValueIsLoading) {
var copy = self.chunks[chunk];
if (copy.getTag() == .Empty) {
try self.chunks[chunk].wait(copy, timeout);
}
continue;
}
return e;
};
if (maybechunk) |c| {
return c;
}
return error.ValueIsNotLoading;
}
}
pub fn release(self: *FileChunks, chunk: u64) !void {
while (true) {
var copy = self.chunks[chunk];
var origin = copy;
switch (copy.getTag()) {
.Empty => return error.ReleaseEmptyPage,
.Error => return error.ValueInErrorState,
.Loading => return error.ReleaseLoadingPage,
.Value => {},
}
try copy.release();
if (self.chunks[chunk].tryUpdate(origin, copy)) {
return;
}
}
}
pub fn markLoading(self: *FileChunks, chunk: u64) !?[]align(mem.page_size) u8 {
while (true) {
var copy = self.chunks[chunk];
var origin = copy;
switch (copy.v.tag) {
.Value => return error.ValueAlreadyExists,
.Error => return error.ValueInErrorState,
.Loading => return null, // already marked..
.Empty => {},
}
copy.setTag(.Loading);
if (self.chunks[chunk].tryUpdate(origin, copy)) {
var offset = chunk * ChunkSize;
const c = self.ptr[offset..(offset + ChunkSize)];
_ = try os.mmap(
c.ptr,
ChunkSize,
os.PROT.READ | os.PROT.WRITE,
os.MAP.ANONYMOUS | os.MAP.PRIVATE | os.MAP.FIXED,
-1,
0,
);
return c;
}
}
}
pub fn markLoaded(self: *FileChunks, chunk: u64) ![]align(mem.page_size) u8 {
while (true) {
var copy = self.chunks[chunk];
var origin = copy;
switch (copy.v.tag) {
.Value => return error.ValueAlreadyExists,
.Error => return error.ValueInErrorState,
.Empty => return error.ValueIsNotLoading,
.Loading => {},
}
copy.setTag(.Value);
try copy.addRef(); // ownership by the pager
try copy.addRef(); // ownership by the caller
if (self.chunks[chunk].tryUpdate(origin, copy)) {
return self.getLoadedChunk(chunk);
}
}
}
pub fn markLoadError(self: *FileChunks, chunk: u64, err: anyerror) !void {
while (true) {
var copy = self.chunks[chunk];
var origin = copy;
switch (copy.v.tag) {
.Value => return error.ValueAlreadyExists,
.Error => return error.ValueInErrorState,
.Empty => return error.ValueIsNotLoading,
.Loading => {},
}
copy.setTag(.Error);
copy.v.references = @errorToInt(err);
try copy.addRef();
if (self.chunks[chunk].tryUpdate(origin, copy)) {
return;
}
}
}
};
test "can create file pager" {
var allocator = testing.allocator;
var pager = try FileChunks.init(allocator);
defer pager.deinit();
}
test "will try load a chunk" {
var allocator = testing.allocator;
var pager = try FileChunks.init(allocator);
defer pager.deinit();
var chunk = try pager.tryGet(0);
try testing.expectEqual(chunk, null);
var dst = try pager.markLoading(0);
try testing.expect(dst != null);
mem.set(u8, dst.?, 1);
_ = try pager.markLoaded(0);
chunk = try pager.tryGet(0);
try testing.expect(chunk != null);
try testing.expect(1 == chunk.?[0]);
var chunkBlocking = try pager.getBlocking(0, null);
try testing.expect(1 == chunkBlocking[0]);
var snd = try pager.markLoading(1);
mem.set(u8, snd.?, 2);
var t = try std.Thread.spawn(.{}, complete_loading, .{ pager, 1 });
t.detach();
var val = try pager.getBlocking(1, 1000);
try testing.expect(2 == val[0]);
}
fn complete_loading(pager: *FileChunks, chunk: u64) void {
std.time.sleep(5); //let the schedule wait for it...
_ = pager.markLoaded(chunk) catch unreachable;
}
test "will be able to reclaim" {
var allocator = testing.allocator;
var pager = try FileChunks.init(allocator);
defer pager.deinit();
var dst = try pager.markLoading(0);
try testing.expect(dst != null);
mem.set(u8, dst.?, 1);
_ = try pager.markLoaded(0);
var snd = try pager.markLoading(1);
mem.set(u8, snd.?, 1);
_ = try pager.markLoaded(1);
try pager.release(0);
var reclaimed = try pager.reclaim();
try testing.expect(reclaimed == FileChunks.ChunkSize);
}
pub fn main() !void {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment