Skip to content

Instantly share code, notes, and snippets.

@ayende
Created March 7, 2025 12:39
Show Gist options
  • Save ayende/eafff96d82d921e2848211b05301dc35 to your computer and use it in GitHub Desktop.
Save ayende/eafff96d82d921e2848211b05301dc35 to your computer and use it in GitHub Desktop.
#define NTDDI_VERSION NTDDI_WIN10_NI
#include <intrin.h>
#include <windows.h>
#include <stdio.h>
#include <stdint.h>
#include <stdbool.h>
#include <dbghelp.h>
#pragma comment(lib, "DbgHelp.lib")
uint64_t nextPowerOf2(uint64_t n)
{
if (n == 0)
{
return 1;
}
n--;
n |= n >> 1;
n |= n >> 2;
n |= n >> 4;
n |= n >> 8;
n |= n >> 16;
n |= n >> 32;
return n + 1;
}
typedef enum workitem_type
{
workitem_none,
workitem_write,
workitem_fsync,
} workitem_type;
struct page_to_write
{
int64_t page_num;
int32_t count_of_pages;
void* ptr;
};
struct workitem
{
struct workitem* next;
struct workitem* prev;
int completed;
HANDLE file;
HANDLE notify;
int result;
workitem_type type;
bool errored;
uint64_t offset;
uint64_t size;
char* buffer;
};
struct IoRingSetup
{
HANDLE event;
HANDLE thread;
HANDLE has_error;
int errored;
struct workitem* head;
};
struct IoRingSetup IoRing;
void PrintStackTrace() {
HANDLE process = GetCurrentProcess();
SymInitialize(process, NULL, TRUE); // Initialize symbol handler
void* stack[64];
unsigned short frames = CaptureStackBackTrace(0, 64, stack, NULL);
SYMBOL_INFO* symbol = (SYMBOL_INFO*)calloc(sizeof(SYMBOL_INFO) + 256 * sizeof(char), 1);
symbol->MaxNameLen = 255;
symbol->SizeOfStruct = sizeof(SYMBOL_INFO);
IMAGEHLP_LINE64 line;
line.SizeOfStruct = sizeof(IMAGEHLP_LINE64);
for (unsigned short i = 0; i < frames; i++) {
DWORD64 address = (DWORD64)(stack[i]);
SymFromAddr(process, address, NULL, symbol);
DWORD displacement;
if (SymGetLineFromAddr64(process, address, &displacement, &line)) {
printf("Frame %02d: %s [0x%llx] (Line %lu)\n",
i, symbol->Name, address, line.LineNumber);
}
else {
printf("Frame %02d: %s [0x%llx] (Line info unavailable)\n",
i, symbol->Name, address);
}
}
free(symbol);
SymCleanup(process);
}
bool hasCycle(struct workitem* head)
{
if (head == NULL || head->next == NULL)
{
return false;
}
struct workitem* slow = head; // Tortoise: moves one step
struct workitem* fast = head->next; // Hare: moves two steps
while (fast != NULL && fast->next != NULL)
{
if (slow == fast)
{
return true; // Cycle detected: pointers meet
}
slow = slow->next; // Move one step
fast = fast->next->next; // Move two steps
}
return false; // No cycle: reached end (NULL)
}
__declspec(noinline) void alert()
{
PrintStackTrace();
while (1)
{
Beep(800, 200);
Sleep(200);
}
}
void queue_work(struct workitem* work)
{
if (work->completed)
{
alert();
}
work->next = IoRing.head;
while (true)
{
if (hasCycle(work))
{
alert();
}
struct workitem* cur_head = InterlockedCompareExchangePointer(&IoRing.head, work, work->next);
if (cur_head == work->next)
break;
work->next = cur_head;
}
}
DWORD WINAPI do_ring_work(LPVOID lpThreadParameter)
{
SetThreadDescription(GetCurrentThread(), L"Rvn.Ring.Wrkr");
struct workitem* work = NULL;
HRESULT hr = 0;
while (true)
{
// wait for any writes on the event / completion on the ring
if (WaitForSingleObject(IoRing.event, INFINITE) != WAIT_OBJECT_0)
{
alert();
}
ResetEvent(IoRing.event);
bool has_work = true;
while (has_work)
{
has_work = false;
bool must_wait = false;
if (!work) // we may have _previous_ work to run through
{
work = InterlockedExchangePointer(&IoRing.head, 0);
if (hasCycle(work))
{
alert();
}
}
while (work) {
struct workitem* n = work->next;
InterlockedExchange(&work->completed, 7);
SetEvent(work->notify);
work = n;
}
}
}
return 0;
}
struct handle_global_state
{
CRITICAL_SECTION lock;
uint32_t ref_count;
int32_t open_flags;
HANDLE notify;
char* file_path;
void* arena;
DWORD last_arena_thread;
size_t arena_size;
};
// This state represent a single handle to the pager on a file
// multiple such instances may exists at the same time
struct handle
{
HANDLE file_handle;
HANDLE file_mapping_handle;
void* read_address;
void* write_address;
int64_t allocation_size;
int32_t status_flags;
int64_t locked_memory;
struct handle_global_state* global_state;
};
int32_t rvn_write_io_ring(
void* handle,
struct page_to_write* buffers,
int32_t count,
int32_t* detailed_error_code)
{
int32_t rc = 0;
struct handle* handle_ptr = handle;
if (count == 0)
return 0;
if (WaitForSingleObject(IoRing.has_error, 0) == WAIT_OBJECT_0)
{
*detailed_error_code = ERROR_IO_INCOMPLETE;
return 1;
}
EnterCriticalSection(&handle_ptr->global_state->lock);
size_t max_req_size = (size_t)count * sizeof(struct workitem);
size_t orign_size = handle_ptr->global_state->arena_size;
bool ralloced = false;
void* origin_arena = handle_ptr->global_state->arena;
if (handle_ptr->global_state->arena_size < max_req_size)
{
ralloced = true;
size_t size = (size_t)nextPowerOf2(max_req_size);
void* ptr = realloc(handle_ptr->global_state->arena, size);
if (!ptr)
{
*detailed_error_code = errno;
LeaveCriticalSection(&handle_ptr->global_state->lock);
return 3;
}
handle_ptr->global_state->last_arena_thread = GetCurrentThreadId();
handle_ptr->global_state->arena = ptr;
handle_ptr->global_state->arena_size = size;
}
ResetEvent(handle_ptr->global_state->notify);
void* old_global_state = handle_ptr->global_state;
char* buf = handle_ptr->global_state->arena;
struct workitem* prev = NULL;
char* old_arena = handle_ptr->global_state->arena;
for (int32_t curIdx = 0; curIdx < count; curIdx++)
{
uint64_t offset = buffers[curIdx].page_num * 8192;
uint64_t size = (uint64_t)buffers[curIdx].count_of_pages * 8192;
struct workitem* work = (struct workitem*)buf;
ptrdiff_t diff = (char*)work - (char*)handle_ptr->global_state->arena;
if (diff > (ptrdiff_t)handle_ptr->global_state->arena_size ||
diff < 0)
{
PrintStackTrace();
printf("Diff: %llu, old: %p, new: %p, work: %p\n", diff, old_arena, handle_ptr->global_state->arena, work);
alert();
}
if (old_arena != handle_ptr->global_state->arena)
{
printf("old_global_state : %p, current state: %p\n", old_global_state, handle_ptr->global_state);
alert();
}
buf += sizeof(struct workitem);
*work = (struct workitem){
.buffer = buffers[curIdx].ptr,
.size = size,
.completed = 0,
.type = workitem_write,
.file = handle_ptr->file_handle,
.offset = offset,
.errored = false,
.result = 0,
.prev = prev,
.notify = handle_ptr->global_state->notify,
};
if (old_arena != handle_ptr->global_state->arena)
{
printf("old_global_state : %p, current state: %p\n", old_global_state, handle_ptr->global_state);
printf("old_arena : %p, current state: %p\n", old_arena, handle_ptr->global_state->arena);
alert();
}
if (work->completed != 0) {
printf("old_global_state : %p, current state: %p\n", old_global_state, handle_ptr->global_state);
printf("old_arena : %p, current state: %p\n", old_arena, handle_ptr->global_state->arena);
alert();
}
prev = work;
queue_work(work);
}
SetEvent(IoRing.event);
bool all_done = false;
while (!all_done)
{
all_done = true;
rc = 0;
*detailed_error_code = 0;
if (WaitForSingleObject(handle_ptr->global_state->notify, INFINITE) == WAIT_FAILED)
{
*detailed_error_code = GetLastError();
rc = 3;
break;
}
ResetEvent(handle_ptr->global_state->notify);
struct workitem* work = prev;
while (work)
{
all_done &= InterlockedCompareExchange(&work->completed, 0, 0);
if (work->errored)
{
*detailed_error_code = work->result;
rc = 3;
// note that we still need to wait for the whole
// set to complete before we can safely return...
}
// move to the previous one...
work = work->prev;
}
}
LeaveCriticalSection(&handle_ptr->global_state->lock);
return rc;
}
void once()
{
void* handle;
void* mem;
void* wmem;
int64_t size;
int32_t err;
struct handle_global_state* global_state = calloc(1, sizeof(struct handle_global_state));
InitializeCriticalSection(&global_state->lock);
global_state->notify = CreateEvent(NULL, TRUE, FALSE, NULL);
struct handle* handle_ptr = calloc(1, sizeof(struct handle));
handle_ptr->global_state = global_state;
struct page_to_write* pages = calloc(3400, sizeof(struct page_to_write));
int rc = rvn_write_io_ring(handle_ptr, pages, 2, &err);
rc = rvn_write_io_ring(handle_ptr, pages, 3939, &err);
DeleteCriticalSection(&global_state->lock);
CloseHandle(global_state->notify);
}
int thread(void* state)
{
for (size_t i = 0; i < 1000; i++)
{
once();
DWORD threadId = GetCurrentThreadId();
// Get current time
SYSTEMTIME st;
GetLocalTime(&st);
// Print thread ID and time
printf("%lu @ %02d/%02d/%04d %02d:%02d:%02d.%03d\n",
threadId,
st.wDay, st.wMonth, st.wYear,
st.wHour, st.wMinute, st.wSecond, st.wMilliseconds);
}
return 0;
}
int main()
{
IoRing.event = CreateEvent(NULL, TRUE, FALSE, NULL);
IoRing.head = NULL;
IoRing.thread = CreateThread(NULL, 0, do_ring_work, NULL, 0, NULL);
const int num_threads = 32;
HANDLE threads[32];
for (int i = 0; i < num_threads; i++) {
threads[i] = CreateThread(
NULL, // Default security attributes
0, // Default stack size
thread, // Thread function
0, // No parameter passed to thread
0, // Run immediately
NULL // No need for thread ID
);
if (threads[i] == NULL) {
printf("Failed to create thread %d: %lu\n", i, GetLastError());
// Cleanup already created threads
for (int j = 0; j < i; j++) {
CloseHandle(threads[j]);
}
return 1;
}
}
// Wait for all threads to complete
auto r = WaitForMultipleObjects(
num_threads, // Number of threads
threads, // Array of thread handles
TRUE, // Wait for all to finish
INFINITE // No timeout
);
// Clean up thread handles
for (int i = 0; i < num_threads; i++) {
CloseHandle(threads[i]);
}
printf("All threads completed\n");
return 0;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment