-
-
Save ayende/eafff96d82d921e2848211b05301dc35 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define NTDDI_VERSION NTDDI_WIN10_NI | |
#include <intrin.h> | |
#include <windows.h> | |
#include <stdio.h> | |
#include <stdint.h> | |
#include <stdbool.h> | |
#include <dbghelp.h> | |
#pragma comment(lib, "DbgHelp.lib") | |
uint64_t nextPowerOf2(uint64_t n) | |
{ | |
if (n == 0) | |
{ | |
return 1; | |
} | |
n--; | |
n |= n >> 1; | |
n |= n >> 2; | |
n |= n >> 4; | |
n |= n >> 8; | |
n |= n >> 16; | |
n |= n >> 32; | |
return n + 1; | |
} | |
typedef enum workitem_type | |
{ | |
workitem_none, | |
workitem_write, | |
workitem_fsync, | |
} workitem_type; | |
struct page_to_write | |
{ | |
int64_t page_num; | |
int32_t count_of_pages; | |
void* ptr; | |
}; | |
struct workitem | |
{ | |
struct workitem* next; | |
struct workitem* prev; | |
int completed; | |
HANDLE file; | |
HANDLE notify; | |
int result; | |
workitem_type type; | |
bool errored; | |
uint64_t offset; | |
uint64_t size; | |
char* buffer; | |
}; | |
struct IoRingSetup | |
{ | |
HANDLE event; | |
HANDLE thread; | |
HANDLE has_error; | |
int errored; | |
struct workitem* head; | |
}; | |
struct IoRingSetup IoRing; | |
void PrintStackTrace() { | |
HANDLE process = GetCurrentProcess(); | |
SymInitialize(process, NULL, TRUE); // Initialize symbol handler | |
void* stack[64]; | |
unsigned short frames = CaptureStackBackTrace(0, 64, stack, NULL); | |
SYMBOL_INFO* symbol = (SYMBOL_INFO*)calloc(sizeof(SYMBOL_INFO) + 256 * sizeof(char), 1); | |
symbol->MaxNameLen = 255; | |
symbol->SizeOfStruct = sizeof(SYMBOL_INFO); | |
IMAGEHLP_LINE64 line; | |
line.SizeOfStruct = sizeof(IMAGEHLP_LINE64); | |
for (unsigned short i = 0; i < frames; i++) { | |
DWORD64 address = (DWORD64)(stack[i]); | |
SymFromAddr(process, address, NULL, symbol); | |
DWORD displacement; | |
if (SymGetLineFromAddr64(process, address, &displacement, &line)) { | |
printf("Frame %02d: %s [0x%llx] (Line %lu)\n", | |
i, symbol->Name, address, line.LineNumber); | |
} | |
else { | |
printf("Frame %02d: %s [0x%llx] (Line info unavailable)\n", | |
i, symbol->Name, address); | |
} | |
} | |
free(symbol); | |
SymCleanup(process); | |
} | |
bool hasCycle(struct workitem* head) | |
{ | |
if (head == NULL || head->next == NULL) | |
{ | |
return false; | |
} | |
struct workitem* slow = head; // Tortoise: moves one step | |
struct workitem* fast = head->next; // Hare: moves two steps | |
while (fast != NULL && fast->next != NULL) | |
{ | |
if (slow == fast) | |
{ | |
return true; // Cycle detected: pointers meet | |
} | |
slow = slow->next; // Move one step | |
fast = fast->next->next; // Move two steps | |
} | |
return false; // No cycle: reached end (NULL) | |
} | |
__declspec(noinline) void alert() | |
{ | |
PrintStackTrace(); | |
while (1) | |
{ | |
Beep(800, 200); | |
Sleep(200); | |
} | |
} | |
void queue_work(struct workitem* work) | |
{ | |
if (work->completed) | |
{ | |
alert(); | |
} | |
work->next = IoRing.head; | |
while (true) | |
{ | |
if (hasCycle(work)) | |
{ | |
alert(); | |
} | |
struct workitem* cur_head = InterlockedCompareExchangePointer(&IoRing.head, work, work->next); | |
if (cur_head == work->next) | |
break; | |
work->next = cur_head; | |
} | |
} | |
DWORD WINAPI do_ring_work(LPVOID lpThreadParameter) | |
{ | |
SetThreadDescription(GetCurrentThread(), L"Rvn.Ring.Wrkr"); | |
struct workitem* work = NULL; | |
HRESULT hr = 0; | |
while (true) | |
{ | |
// wait for any writes on the event / completion on the ring | |
if (WaitForSingleObject(IoRing.event, INFINITE) != WAIT_OBJECT_0) | |
{ | |
alert(); | |
} | |
ResetEvent(IoRing.event); | |
bool has_work = true; | |
while (has_work) | |
{ | |
has_work = false; | |
bool must_wait = false; | |
if (!work) // we may have _previous_ work to run through | |
{ | |
work = InterlockedExchangePointer(&IoRing.head, 0); | |
if (hasCycle(work)) | |
{ | |
alert(); | |
} | |
} | |
while (work) { | |
struct workitem* n = work->next; | |
InterlockedExchange(&work->completed, 7); | |
SetEvent(work->notify); | |
work = n; | |
} | |
} | |
} | |
return 0; | |
} | |
struct handle_global_state | |
{ | |
CRITICAL_SECTION lock; | |
uint32_t ref_count; | |
int32_t open_flags; | |
HANDLE notify; | |
char* file_path; | |
void* arena; | |
DWORD last_arena_thread; | |
size_t arena_size; | |
}; | |
// This state represent a single handle to the pager on a file | |
// multiple such instances may exists at the same time | |
struct handle | |
{ | |
HANDLE file_handle; | |
HANDLE file_mapping_handle; | |
void* read_address; | |
void* write_address; | |
int64_t allocation_size; | |
int32_t status_flags; | |
int64_t locked_memory; | |
struct handle_global_state* global_state; | |
}; | |
int32_t rvn_write_io_ring( | |
void* handle, | |
struct page_to_write* buffers, | |
int32_t count, | |
int32_t* detailed_error_code) | |
{ | |
int32_t rc = 0; | |
struct handle* handle_ptr = handle; | |
if (count == 0) | |
return 0; | |
if (WaitForSingleObject(IoRing.has_error, 0) == WAIT_OBJECT_0) | |
{ | |
*detailed_error_code = ERROR_IO_INCOMPLETE; | |
return 1; | |
} | |
EnterCriticalSection(&handle_ptr->global_state->lock); | |
size_t max_req_size = (size_t)count * sizeof(struct workitem); | |
size_t orign_size = handle_ptr->global_state->arena_size; | |
bool ralloced = false; | |
void* origin_arena = handle_ptr->global_state->arena; | |
if (handle_ptr->global_state->arena_size < max_req_size) | |
{ | |
ralloced = true; | |
size_t size = (size_t)nextPowerOf2(max_req_size); | |
void* ptr = realloc(handle_ptr->global_state->arena, size); | |
if (!ptr) | |
{ | |
*detailed_error_code = errno; | |
LeaveCriticalSection(&handle_ptr->global_state->lock); | |
return 3; | |
} | |
handle_ptr->global_state->last_arena_thread = GetCurrentThreadId(); | |
handle_ptr->global_state->arena = ptr; | |
handle_ptr->global_state->arena_size = size; | |
} | |
ResetEvent(handle_ptr->global_state->notify); | |
void* old_global_state = handle_ptr->global_state; | |
char* buf = handle_ptr->global_state->arena; | |
struct workitem* prev = NULL; | |
char* old_arena = handle_ptr->global_state->arena; | |
for (int32_t curIdx = 0; curIdx < count; curIdx++) | |
{ | |
uint64_t offset = buffers[curIdx].page_num * 8192; | |
uint64_t size = (uint64_t)buffers[curIdx].count_of_pages * 8192; | |
struct workitem* work = (struct workitem*)buf; | |
ptrdiff_t diff = (char*)work - (char*)handle_ptr->global_state->arena; | |
if (diff > (ptrdiff_t)handle_ptr->global_state->arena_size || | |
diff < 0) | |
{ | |
PrintStackTrace(); | |
printf("Diff: %llu, old: %p, new: %p, work: %p\n", diff, old_arena, handle_ptr->global_state->arena, work); | |
alert(); | |
} | |
if (old_arena != handle_ptr->global_state->arena) | |
{ | |
printf("old_global_state : %p, current state: %p\n", old_global_state, handle_ptr->global_state); | |
alert(); | |
} | |
buf += sizeof(struct workitem); | |
*work = (struct workitem){ | |
.buffer = buffers[curIdx].ptr, | |
.size = size, | |
.completed = 0, | |
.type = workitem_write, | |
.file = handle_ptr->file_handle, | |
.offset = offset, | |
.errored = false, | |
.result = 0, | |
.prev = prev, | |
.notify = handle_ptr->global_state->notify, | |
}; | |
if (old_arena != handle_ptr->global_state->arena) | |
{ | |
printf("old_global_state : %p, current state: %p\n", old_global_state, handle_ptr->global_state); | |
printf("old_arena : %p, current state: %p\n", old_arena, handle_ptr->global_state->arena); | |
alert(); | |
} | |
if (work->completed != 0) { | |
printf("old_global_state : %p, current state: %p\n", old_global_state, handle_ptr->global_state); | |
printf("old_arena : %p, current state: %p\n", old_arena, handle_ptr->global_state->arena); | |
alert(); | |
} | |
prev = work; | |
queue_work(work); | |
} | |
SetEvent(IoRing.event); | |
bool all_done = false; | |
while (!all_done) | |
{ | |
all_done = true; | |
rc = 0; | |
*detailed_error_code = 0; | |
if (WaitForSingleObject(handle_ptr->global_state->notify, INFINITE) == WAIT_FAILED) | |
{ | |
*detailed_error_code = GetLastError(); | |
rc = 3; | |
break; | |
} | |
ResetEvent(handle_ptr->global_state->notify); | |
struct workitem* work = prev; | |
while (work) | |
{ | |
all_done &= InterlockedCompareExchange(&work->completed, 0, 0); | |
if (work->errored) | |
{ | |
*detailed_error_code = work->result; | |
rc = 3; | |
// note that we still need to wait for the whole | |
// set to complete before we can safely return... | |
} | |
// move to the previous one... | |
work = work->prev; | |
} | |
} | |
LeaveCriticalSection(&handle_ptr->global_state->lock); | |
return rc; | |
} | |
void once() | |
{ | |
void* handle; | |
void* mem; | |
void* wmem; | |
int64_t size; | |
int32_t err; | |
struct handle_global_state* global_state = calloc(1, sizeof(struct handle_global_state)); | |
InitializeCriticalSection(&global_state->lock); | |
global_state->notify = CreateEvent(NULL, TRUE, FALSE, NULL); | |
struct handle* handle_ptr = calloc(1, sizeof(struct handle)); | |
handle_ptr->global_state = global_state; | |
struct page_to_write* pages = calloc(3400, sizeof(struct page_to_write)); | |
int rc = rvn_write_io_ring(handle_ptr, pages, 2, &err); | |
rc = rvn_write_io_ring(handle_ptr, pages, 3939, &err); | |
DeleteCriticalSection(&global_state->lock); | |
CloseHandle(global_state->notify); | |
} | |
int thread(void* state) | |
{ | |
for (size_t i = 0; i < 1000; i++) | |
{ | |
once(); | |
DWORD threadId = GetCurrentThreadId(); | |
// Get current time | |
SYSTEMTIME st; | |
GetLocalTime(&st); | |
// Print thread ID and time | |
printf("%lu @ %02d/%02d/%04d %02d:%02d:%02d.%03d\n", | |
threadId, | |
st.wDay, st.wMonth, st.wYear, | |
st.wHour, st.wMinute, st.wSecond, st.wMilliseconds); | |
} | |
return 0; | |
} | |
int main() | |
{ | |
IoRing.event = CreateEvent(NULL, TRUE, FALSE, NULL); | |
IoRing.head = NULL; | |
IoRing.thread = CreateThread(NULL, 0, do_ring_work, NULL, 0, NULL); | |
const int num_threads = 32; | |
HANDLE threads[32]; | |
for (int i = 0; i < num_threads; i++) { | |
threads[i] = CreateThread( | |
NULL, // Default security attributes | |
0, // Default stack size | |
thread, // Thread function | |
0, // No parameter passed to thread | |
0, // Run immediately | |
NULL // No need for thread ID | |
); | |
if (threads[i] == NULL) { | |
printf("Failed to create thread %d: %lu\n", i, GetLastError()); | |
// Cleanup already created threads | |
for (int j = 0; j < i; j++) { | |
CloseHandle(threads[j]); | |
} | |
return 1; | |
} | |
} | |
// Wait for all threads to complete | |
auto r = WaitForMultipleObjects( | |
num_threads, // Number of threads | |
threads, // Array of thread handles | |
TRUE, // Wait for all to finish | |
INFINITE // No timeout | |
); | |
// Clean up thread handles | |
for (int i = 0; i < num_threads; i++) { | |
CloseHandle(threads[i]); | |
} | |
printf("All threads completed\n"); | |
return 0; | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment