-
-
Save ayende/f53d266c362707439a06f1d99608bd8f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const builtin = @import("builtin"); | |
const os = std.os; | |
const mem = std.mem; | |
const testing = std.testing; | |
pub const Page = struct { | |
page: u64, | |
numberOfPages: u32, | |
buffer: []u8, | |
}; | |
pub const FileChunks = struct { | |
pub const MaxFileSize = 8 * 1024 * 1024 * 1024; // 8 GB | |
pub const ChunkSize = 2 * 1024 * 1024; // 2 MB | |
pub const MaxChunksInFile = MaxFileSize / ChunkSize; // 4096 | |
pub const PageSize = 8 * 1024; // 8 KB | |
pub const PagesInChunk = ChunkSize / PageSize; // 256 | |
pub const ChunkMetadata = packed union { | |
pub const Tag = enum(u2) { | |
Empty = 0b00, | |
Error = 0b01, | |
Loading = 0b10, | |
Value = 0b11, | |
}; | |
raw: u64, | |
futex: packed struct { | |
value: u32, // tag & version | |
references: u32, // ignored | |
pub fn val(self: *@This()) *std.atomic.Atomic(u32) { | |
return @ptrCast(*std.atomic.Atomic(u32), @alignCast(@alignOf(u32), &self.value)); | |
} | |
}, | |
v: packed struct { | |
version: u30, | |
tag: Tag, | |
// references == 0 - this is unused | |
// references == 1 - just the pager is holding this | |
// refereces >= 2 - external entity is holding this | |
references: u32, | |
}, | |
pub fn getTag(self: *ChunkMetadata) Tag { | |
return self.v.tag; | |
} | |
pub fn setTag(self: *ChunkMetadata, tag: Tag) void { | |
self.v.version +%= 1; | |
self.v.tag = tag; | |
} | |
pub fn release(self: *ChunkMetadata) !void { | |
self.v.version +%= 1; | |
if (self.v.references == 0) { | |
return error.ChunkReferenceCountUnderfow; | |
} | |
self.v.references -= 1; | |
} | |
pub fn addRef(self: *ChunkMetadata) !void { | |
comptime var maxVal = std.math.maxInt(@TypeOf(self.v.references)); | |
if (@intCast(u64, self.v.references) + 1 > maxVal) { | |
return error.ChunkReferenceCountOverflow; | |
} | |
self.v.references += 1; | |
self.v.version +%= 1; | |
} | |
pub fn wait(self: *ChunkMetadata, version: ChunkMetadata, timeout: ?u64) !void { | |
try std.Thread.Futex.wait(self.futex.val(), version.futex.value, timeout); | |
} | |
pub fn tryUpdate(self: *ChunkMetadata, origin: ChunkMetadata, newVal: ChunkMetadata) bool { | |
if (@cmpxchgWeak(u64, &self.raw, origin.raw, newVal.raw, .Monotonic, .Monotonic) == null) { | |
std.Thread.Futex.wake(self.futex.val(), std.math.maxInt(u32)); | |
return true; | |
} | |
return false; | |
} | |
pub fn mayReclaim(self: *ChunkMetadata) bool { | |
return self.v.references == 1 and self.v.tag == .Value; | |
} | |
pub fn reset(self: *ChunkMetadata, tag: Tag) ChunkMetadata { | |
return ChunkMetadata{ .v = .{ | |
.tag = tag, | |
.version = self.v.version +% 1, | |
.references = 0, | |
} }; | |
} | |
}; | |
comptime { | |
if (@sizeOf(ChunkMetadata) != @sizeOf(u64)) { | |
@compileError("ChunkMetadata should be exactly 64 bits in length"); | |
} | |
} | |
chunks: [MaxChunksInFile]ChunkMetadata, | |
ptr: []align(mem.page_size) u8, | |
allocator: mem.Allocator, | |
pub fn init(allocator: mem.Allocator) !*FileChunks { | |
var self = try allocator.create(FileChunks); | |
errdefer allocator.destroy(self); | |
self.allocator = allocator; | |
mem.set(ChunkMetadata, &self.chunks, .{ .raw = 0 }); | |
self.ptr = try os.mmap( | |
null, | |
MaxFileSize, | |
os.PROT.NONE, | |
os.MAP.ANONYMOUS | os.MAP.PRIVATE, | |
-1, | |
0, | |
); | |
errdefer os.munmap(self.ptr); | |
return self; | |
} | |
pub fn deinit(self: *FileChunks) void { | |
os.munmap(self.ptr); | |
self.allocator.destroy(self); | |
} | |
fn tryClaimOwnership(self: *FileChunks, index: u64) ?ChunkMetadata { | |
while (true) { | |
var copy = self.chunks[index]; | |
if (copy.mayReclaim() == false) | |
return null; | |
var modified = copy.reset(.Loading); // means that we own it for the duration... | |
if (self.chunks[index].tryUpdate(copy, modified)) | |
return modified; | |
} | |
} | |
fn trySetToEmpty(self: *FileChunks, index: u64) !void { | |
while (true) { | |
var modified = self.chunks[index]; | |
if (modified.getTag() != .Loading) { | |
// someone else modified it while we where releasing the memory | |
return error.ValueIsNotLoading; | |
} | |
var released = modified.reset(.Empty); | |
if (self.chunks[index].tryUpdate(modified, released)) | |
break; | |
} | |
} | |
pub fn reclaim(self: *FileChunks) !u64 { | |
var reclaimed: u64 = 0; | |
var index: u64 = 0; | |
while (index < self.chunks.len) : (index += 1) { | |
var modified: ChunkMetadata = undefined; | |
if (self.tryClaimOwnership(index)) |m| { | |
// at this point, m is owned by us, and no one else can use it... | |
modified = m; | |
} else { | |
continue; | |
} | |
reclaimed += ChunkSize; | |
_ = try os.mmap( | |
self.getLoadedChunk(index).ptr, | |
ChunkSize, | |
os.PROT.NONE, | |
os.MAP.ANONYMOUS | os.MAP.PRIVATE | os.MAP.FIXED, | |
-1, | |
0, | |
); | |
try self.trySetToEmpty(index); | |
} | |
return reclaimed; | |
} | |
fn getLoadedChunk(self: *FileChunks, chunk: u64) []align(mem.page_size) u8 { | |
var offset = chunk * ChunkSize; | |
return self.ptr[offset..(offset + ChunkSize)]; | |
} | |
pub fn tryGet(self: *FileChunks, chunk: u64) !?[]align(mem.page_size) u8 { | |
while (true) { | |
var copy = self.chunks[chunk]; | |
var origin = copy; | |
switch (copy.getTag()) { | |
.Empty => return null, | |
.Error => return @intToError(@intCast(u16, copy.v.references)), | |
.Loading => return error.ValueIsLoading, | |
.Value => {}, | |
} | |
try copy.addRef(); | |
if (self.chunks[chunk].tryUpdate(origin, copy)) { | |
return self.getLoadedChunk(chunk); | |
} | |
} | |
} | |
pub fn getBlocking(self: *FileChunks, chunk: u64, timeout: ?u64) ![]align(mem.page_size) u8 { | |
while (true) { | |
var maybechunk = self.tryGet(chunk) catch |e| { | |
if (e == error.ValueIsLoading) { | |
var copy = self.chunks[chunk]; | |
if (copy.getTag() == .Empty) { | |
try self.chunks[chunk].wait(copy, timeout); | |
} | |
continue; | |
} | |
return e; | |
}; | |
if (maybechunk) |c| { | |
return c; | |
} | |
return error.ValueIsNotLoading; | |
} | |
} | |
pub fn release(self: *FileChunks, chunk: u64) !void { | |
while (true) { | |
var copy = self.chunks[chunk]; | |
var origin = copy; | |
switch (copy.getTag()) { | |
.Empty => return error.ReleaseEmptyPage, | |
.Error => return error.ValueInErrorState, | |
.Loading => return error.ReleaseLoadingPage, | |
.Value => {}, | |
} | |
try copy.release(); | |
if (self.chunks[chunk].tryUpdate(origin, copy)) { | |
return; | |
} | |
} | |
} | |
pub fn markLoading(self: *FileChunks, chunk: u64) !?[]align(mem.page_size) u8 { | |
while (true) { | |
var copy = self.chunks[chunk]; | |
var origin = copy; | |
switch (copy.v.tag) { | |
.Value => return error.ValueAlreadyExists, | |
.Error => return error.ValueInErrorState, | |
.Loading => return null, // already marked.. | |
.Empty => {}, | |
} | |
copy.setTag(.Loading); | |
if (self.chunks[chunk].tryUpdate(origin, copy)) { | |
var offset = chunk * ChunkSize; | |
const c = self.ptr[offset..(offset + ChunkSize)]; | |
_ = try os.mmap( | |
c.ptr, | |
ChunkSize, | |
os.PROT.READ | os.PROT.WRITE, | |
os.MAP.ANONYMOUS | os.MAP.PRIVATE | os.MAP.FIXED, | |
-1, | |
0, | |
); | |
return c; | |
} | |
} | |
} | |
pub fn markLoaded(self: *FileChunks, chunk: u64) ![]align(mem.page_size) u8 { | |
while (true) { | |
var copy = self.chunks[chunk]; | |
var origin = copy; | |
switch (copy.v.tag) { | |
.Value => return error.ValueAlreadyExists, | |
.Error => return error.ValueInErrorState, | |
.Empty => return error.ValueIsNotLoading, | |
.Loading => {}, | |
} | |
copy.setTag(.Value); | |
try copy.addRef(); // ownership by the pager | |
try copy.addRef(); // ownership by the caller | |
if (self.chunks[chunk].tryUpdate(origin, copy)) { | |
return self.getLoadedChunk(chunk); | |
} | |
} | |
} | |
pub fn markLoadError(self: *FileChunks, chunk: u64, err: anyerror) !void { | |
while (true) { | |
var copy = self.chunks[chunk]; | |
var origin = copy; | |
switch (copy.v.tag) { | |
.Value => return error.ValueAlreadyExists, | |
.Error => return error.ValueInErrorState, | |
.Empty => return error.ValueIsNotLoading, | |
.Loading => {}, | |
} | |
copy.setTag(.Error); | |
copy.v.references = @errorToInt(err); | |
try copy.addRef(); | |
if (self.chunks[chunk].tryUpdate(origin, copy)) { | |
return; | |
} | |
} | |
} | |
}; | |
test "can create file pager" { | |
var allocator = testing.allocator; | |
var pager = try FileChunks.init(allocator); | |
defer pager.deinit(); | |
} | |
test "will try load a chunk" { | |
var allocator = testing.allocator; | |
var pager = try FileChunks.init(allocator); | |
defer pager.deinit(); | |
var chunk = try pager.tryGet(0); | |
try testing.expectEqual(chunk, null); | |
var dst = try pager.markLoading(0); | |
try testing.expect(dst != null); | |
mem.set(u8, dst.?, 1); | |
_ = try pager.markLoaded(0); | |
chunk = try pager.tryGet(0); | |
try testing.expect(chunk != null); | |
try testing.expect(1 == chunk.?[0]); | |
var chunkBlocking = try pager.getBlocking(0, null); | |
try testing.expect(1 == chunkBlocking[0]); | |
var snd = try pager.markLoading(1); | |
mem.set(u8, snd.?, 2); | |
var t = try std.Thread.spawn(.{}, complete_loading, .{ pager, 1 }); | |
t.detach(); | |
var val = try pager.getBlocking(1, 1000); | |
try testing.expect(2 == val[0]); | |
} | |
fn complete_loading(pager: *FileChunks, chunk: u64) void { | |
std.time.sleep(5); //let the schedule wait for it... | |
_ = pager.markLoaded(chunk) catch unreachable; | |
} | |
test "will be able to reclaim" { | |
var allocator = testing.allocator; | |
var pager = try FileChunks.init(allocator); | |
defer pager.deinit(); | |
var dst = try pager.markLoading(0); | |
try testing.expect(dst != null); | |
mem.set(u8, dst.?, 1); | |
_ = try pager.markLoaded(0); | |
var snd = try pager.markLoading(1); | |
mem.set(u8, snd.?, 1); | |
_ = try pager.markLoaded(1); | |
try pager.release(0); | |
var reclaimed = try pager.reclaim(); | |
try testing.expect(reclaimed == FileChunks.ChunkSize); | |
} | |
pub fn main() !void {} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment