Skip to content

Instantly share code, notes, and snippets.

@AngelicosPhosphoros
Created October 8, 2020 17:47
Show Gist options
  • Save AngelicosPhosphoros/c2c261f18c320d9a43152398e4765199 to your computer and use it in GitHub Desktop.
Save AngelicosPhosphoros/c2c261f18c320d9a43152398e4765199 to your computer and use it in GitHub Desktop.
Example of OVERLAPPED IO using WinAPI
#include <atomic>
#include <cassert>
#include <iostream>
#include <list>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
#include <vector>
#include "windows.h"
enum class Operation {
Read, Write
};
struct Task {
Operation operation;
HANDLE file;
OVERLAPPED overlapped{};
std::atomic<bool> is_ready = false;
// Either to read or write
std::vector<char> buffer;
Task(Operation operation,
HANDLE file) : operation(operation), file(file), buffer(1024, 0)
{}
};
struct WorkerContext {
bool continue_work() {
return continue_work_fl.load();
}
void stop_work() {
continue_work_fl.store(false);
}
void add_task(std::shared_ptr<Task> task) {
std::lock_guard<std::mutex> lock(tasks_mutex);
tasks.push_back(std::move(task));
}
std::optional<std::shared_ptr<Task>> get_task() {
if (!tasks_mutex.try_lock()) {
return std::nullopt;
}
std::optional<std::shared_ptr<Task>> result = std::nullopt;
if (!tasks.empty()) {
result = std::move(tasks.back());
tasks.pop_back();
}
tasks_mutex.unlock();
return result;
}
void lock_main_thread() {
std::unique_lock<std::mutex> lock(mt_mutex);
while (!mt_count) // Handle spurious wake-ups.
mt_condition.wait(lock);
--mt_count;
}
void notify_main_thread() {
std::unique_lock<std::mutex> lock(mt_mutex);
++mt_count;
mt_condition.notify_one();
}
std::vector<std::shared_ptr<Task>> to_remove;
private:
std::atomic<bool> continue_work_fl = true;
std::mutex tasks_mutex;
std::vector<std::shared_ptr<Task>> tasks;
// Where is semaphores :(
std::mutex mt_mutex;
std::condition_variable mt_condition;
size_t mt_count;
};
struct RoutineCallbackData {
std::shared_ptr<Task> task;
WorkerContext& context;
};
void finish_routine(DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED lpOverlapped) {
std::cout << "Some routine finished";
RoutineCallbackData& callback_data = *static_cast<RoutineCallbackData*>(lpOverlapped->hEvent);
assert(lpOverlapped == &callback_data.task->overlapped);
callback_data.task->is_ready = true;
callback_data.context.to_remove.push_back(callback_data.task);
callback_data.context.notify_main_thread();
}
void thread_worker(WorkerContext& context) {
std::vector<HANDLE> handles;
std::list<RoutineCallbackData> callback_datas;
while (context.continue_work()) {
while (auto new_task = context.get_task()) {
Task& task = **new_task;
switch (task.operation) {
case Operation::Read:
{
task.is_ready = false;
task.overlapped.Offset = task.overlapped.OffsetHigh = 0;
callback_datas.push_back({ *new_task, context });
task.overlapped.hEvent = &callback_datas.back();
ReadFileEx(task.file, task.buffer.data(), task.buffer.size(), &task.overlapped, finish_routine);
handles.push_back(task.file);
break;
}
case Operation::Write:
{
// TODO
break;
}
}
}
WaitForMultipleObjectsEx(handles.size(), handles.data(), false, 1, true);
for (auto task : context.to_remove) {
auto to_erase = std::remove(handles.begin(), handles.end(), task->file);
if (handles.end() != to_erase) {
handles.erase(to_erase);
}
callback_datas.remove_if([&task](RoutineCallbackData& d) {return d.task == task; });
}
}
}
int main() {
WorkerContext context;
std::thread worker(thread_worker, std::ref(context));
HANDLE file1 = CreateFileW(
L"test1.txt",
GENERIC_READ,
FILE_SHARE_READ,
nullptr,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
nullptr
);
std::shared_ptr<Task> task1 = std::make_shared<Task>(Operation::Read, file1);
context.add_task(task1);
HANDLE file2 = CreateFileW(
L"test2.txt",
GENERIC_READ,
FILE_SHARE_READ,
nullptr,
OPEN_EXISTING,
FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
nullptr
);
std::shared_ptr<Task> task2 = std::make_shared<Task>(Operation::Read, file2);
context.add_task(task2);
bool is_f1_closed = false;
bool is_f2_closed = false;
while (!is_f1_closed || !is_f2_closed) {
std::cout << "Waiting in main thread" << std::endl;
context.lock_main_thread();
if (task1->is_ready.load() && !is_f1_closed) {
std::cout << "Task 1 is ready: " << std::endl;
std::cout << task1->buffer.data() << std::endl;
CloseHandle(file1);
is_f1_closed = true;
}
if (task2->is_ready.load() && !is_f2_closed) {
std::cout << "Task 2 is ready: " << std::endl;
std::cout << task2->buffer.data() << std::endl;
CloseHandle(file2);
is_f2_closed = true;
}
}
context.stop_work();
worker.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment