Last active
March 15, 2023 01:44
-
-
Save meshula/9af43ea1097881ccea5b7b9cbc9fa839 to your computer and use it in GitHub Desktop.
thread pool to test exr utilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <algorithm> | |
#include <atomic> | |
#include <chrono> | |
#include <condition_variable> | |
#include <filesystem> | |
#include <fstream> | |
#include <functional> | |
#include <future> | |
#include <iostream> | |
#include <mutex> | |
#include <queue> | |
#include <sstream> | |
#include <stdexcept> | |
#include <string> | |
#include <thread> | |
#include <vector> | |
// Threadsafe Single-Producer/Multiple-Consumer Queue (SPMC) | |
template <typename T> | |
class spmc_queue { | |
public: | |
spmc_queue() {} | |
void push(T&& value) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
queue_.push(std::move(value)); | |
cv_.notify_one(); | |
} | |
bool try_pop(T& value) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
if (queue_.empty()) { | |
return false; | |
} | |
value = std::move(queue_.front()); | |
queue_.pop(); | |
return true; | |
} | |
void wait_and_pop(T& value) { | |
std::unique_lock<std::mutex> lock(mutex_); | |
cv_.wait(lock, [this]() { return !queue_.empty(); }); | |
value = std::move(queue_.front()); | |
queue_.pop(); | |
} | |
private: | |
std::queue<T> queue_; | |
std::mutex mutex_; | |
std::condition_variable cv_; | |
}; | |
// Worker thread function | |
void worker_thread(int id, spmc_queue<std::function<void()>>& task_queue, std::atomic<bool>& terminate_flag) { | |
while (!terminate_flag.load()) { | |
std::function<void()> task; | |
if (task_queue.try_pop(task)) { | |
task(); | |
} else { | |
std::this_thread::yield(); | |
} | |
} | |
} | |
template <typename T> | |
class MPMCQueue { | |
public: | |
MPMCQueue(size_t size) : buffer(size), readIndex(0), writeIndex(0) {} | |
bool tryPush(const T& item) { | |
size_t currentWriteIndex = writeIndex.load(std::memory_order_relaxed); | |
size_t nextWriteIndex = getNextIndex(currentWriteIndex); | |
if (nextWriteIndex == readIndex.load(std::memory_order_acquire)) { | |
return false; // queue is full | |
} | |
buffer[currentWriteIndex] = item; | |
writeIndex.store(nextWriteIndex, std::memory_order_release); | |
return true; | |
} | |
bool tryPop(T& item) { | |
size_t currentReadIndex = readIndex.load(std::memory_order_relaxed); | |
if (currentReadIndex == writeIndex.load(std::memory_order_acquire)) { | |
return false; // queue is empty | |
} | |
item = buffer[currentReadIndex]; | |
readIndex.store(getNextIndex(currentReadIndex), std::memory_order_release); | |
return true; | |
} | |
/* | |
In this implementation, empty() checks whether the read index and write index | |
are equal, indicating that the queue is empty. It uses std::memory_order_relaxed | |
for the load operations on the read and write indices because we don't need | |
any synchronization with other threads in this case. Note that this implementation | |
assumes that the queue has a fixed size and does not wrap around. If the queue is | |
allowed to wrap around, the empty() method would need to be modified accordingly. | |
*/ | |
bool empty() const { | |
size_t currentReadIndex = readIndex.load(std::memory_order_relaxed); | |
size_t currentWriteIndex = writeIndex.load(std::memory_order_relaxed); | |
return (currentReadIndex == currentWriteIndex); | |
} | |
private: | |
std::vector<T> buffer; | |
std::atomic<size_t> readIndex; | |
std::atomic<size_t> writeIndex; | |
size_t getNextIndex(size_t index) { | |
return (index + 1) % buffer.size(); | |
} | |
}; | |
const int kMaxTasks = 1000; | |
class ThreadPool { | |
public: | |
explicit ThreadPool(std::size_t num_threads = std::thread::hardware_concurrency()) : | |
queue_(kMaxTasks), | |
stop_(false) | |
{ | |
try { | |
for (std::size_t i = 0; i < num_threads; ++i) { | |
threads_.emplace_back(&ThreadPool::worker_thread, this); | |
} | |
} catch (...) { | |
stop_ = true; | |
throw; | |
} | |
} | |
~ThreadPool() { | |
{ | |
std::unique_lock<std::mutex> lock(mutex_); | |
stop_ = true; | |
} | |
cv_.notify_all(); | |
for (auto& thread : threads_) { | |
thread.join(); | |
} | |
} | |
void submit(std::function<void()> task) { | |
if (queue_.tryPush(task)) | |
cv_.notify_one(); | |
} | |
/* | |
In this implementation, the method acquires a lock on the mutex and then waits | |
on the condition variable until the queue is empty, indicating that all submitted | |
tasks have completed. The queue_.empty() method is used to check whether the | |
queue is empty. This method is safe to call from multiple threads because it | |
uses atomic operations to access the read and write indices. When a task is | |
submitted to the queue, the worker threads will decrement the count of | |
unfinished tasks, so the queue_.empty() method will return true when all | |
tasks have been completed. | |
*/ | |
void wait() { | |
std::unique_lock<std::mutex> lock(mutex_); | |
cv_.wait(lock, [this]() { return queue_.empty(); }); | |
} | |
private: | |
void worker_thread() { | |
while (!stop_) { | |
std::function<void()> task; | |
if (queue_.tryPop(task)) { | |
task(); | |
} else { | |
std::unique_lock<std::mutex> lock(mutex_); | |
cv_.wait(lock, [this]() { return stop_ || !queue_.empty(); }); | |
} | |
} | |
} | |
MPMCQueue<std::function<void()>> queue_; | |
std::vector<std::thread> threads_; | |
std::mutex mutex_; | |
std::condition_variable cv_; | |
bool stop_; | |
}; | |
void print_file_name(const std::string& file_path) { | |
std::cout << "Found text file: " << file_path << std::endl; | |
} | |
void recurse_directory(const std::string& directory_path, ThreadPool& thread_pool) { | |
for (const auto& entry : std::filesystem::directory_iterator(directory_path)) { | |
if (entry.is_directory()) { | |
//std::cout << "Recursing " << entry.path() << std::endl; | |
// Recursively process the subdirectory in a new task | |
auto path = entry.path(); | |
thread_pool.submit([path, &thread_pool] { | |
recurse_directory(path.string(), thread_pool); | |
}); | |
} else if (entry.is_regular_file() && entry.path().extension() == ".exr") { | |
std::cout << "Submitting " << entry.path() << std::endl; | |
// Print the file name in a new task | |
auto path = entry.path(); | |
thread_pool.submit([path] { | |
print_file_name(path.string()); | |
}); | |
} | |
else { | |
//std::cout << "Skipping entry " << entry.path() << std::endl; | |
} | |
} | |
//std::cout << "finished recursing directory\n"; | |
} | |
enum class FileComparisonResult { | |
FilesDiffer, | |
FilesIdentical | |
}; | |
FileComparisonResult compareExrFiles(const std::filesystem::path& filePath) { | |
// Construct the shell commands | |
std::string exrinfoCmd = "exrinfo \"" + filePath.string() + "\""; | |
std::string nanoexrinfoCmd = "nanoexrinfo \"" + filePath.string() + "\""; | |
// Run the commands and capture the output | |
std::string exrinfoOutput, nanoexrinfoOutput; | |
FILE* exrinfoPipe = popen(exrinfoCmd.c_str(), "r"); | |
if (!exrinfoPipe) { | |
throw std::runtime_error("Failed to execute exrinfo command"); | |
} | |
char buffer[4096]; | |
while (fgets(buffer, sizeof(buffer), exrinfoPipe)) { | |
exrinfoOutput += buffer; | |
} | |
if (pclose(exrinfoPipe) != 0) { | |
throw std::runtime_error("Error executing exrinfo command"); | |
} | |
FILE* nanoexrinfoPipe = popen(nanoexrinfoCmd.c_str(), "r"); | |
if (!nanoexrinfoPipe) { | |
throw std::runtime_error("Failed to execute nanoexrinfo command"); | |
} | |
while (fgets(buffer, sizeof(buffer), nanoexrinfoPipe)) { | |
nanoexrinfoOutput += buffer; | |
} | |
if (pclose(nanoexrinfoPipe) != 0) { | |
throw std::runtime_error("Error executing nanoexrinfo command"); | |
} | |
// Compare the output | |
if (exrinfoOutput == nanoexrinfoOutput) { | |
return FileComparisonResult::FilesIdentical; | |
} else { | |
return FileComparisonResult::FilesDiffer; | |
} | |
} | |
/* | |
given a std::filesystem::path, write a function that invokes the shell | |
command exrinfo and also the shell command nanoexrinfo. The function | |
captures std::out from both programs, and checks if the output is | |
identical. It returns an enumeration whose values are | |
FilesDiffer, FilesIdentical. If errors are encountered a std::exception | |
is thrown. The exception should include information on the error that | |
occurred, including filename if appropriate. | |
*/ | |
#ifdef _MSC_VER | |
#include <io.h> | |
#include <fcntl.h> | |
/* | |
On Windows, popen and pclose are available through the io.h header file, | |
which defines _popen and _pclose functions. These functions work similarly | |
to the standard popen and pclose functions, except they use a different set | |
of pipe-related APIs that are specific to Windows. | |
*/ | |
std::string execute_command(const char* cmd) { | |
std::string result; | |
char buffer[128]; | |
// Open the command for reading | |
FILE* pipe = _popen(cmd, "r"); | |
if (!pipe) { | |
throw std::runtime_error("Error: Failed to open pipe."); | |
} | |
// Set the pipe to be binary mode | |
_setmode(_fileno(pipe), _O_BINARY); | |
// Read pipe output into buffer | |
while (fgets(buffer, sizeof(buffer), pipe)) { | |
result += buffer; | |
} | |
// Close the pipe | |
int status = _pclose(pipe); | |
if (status < 0) { | |
throw std::runtime_error("Error: Failed to close pipe."); | |
} | |
return result; | |
} | |
#else | |
std::string execute_command(const char* cmd) { | |
std::string result; | |
char buffer[128]; | |
// Open the command for reading | |
FILE* pipe = popen(cmd, "r"); | |
if (!pipe) { | |
throw std::runtime_error("Error: Failed to open pipe."); | |
} | |
// Read pipe output into buffer | |
while (fgets(buffer, sizeof(buffer), pipe)) { | |
result += buffer; | |
} | |
// Close the pipe | |
int status = pclose(pipe); | |
if (status < 0) { | |
throw std::runtime_error("Error: Failed to close pipe."); | |
} | |
return result; | |
} | |
#endif | |
/* | |
The program is in C++. It is going to create n threads, with a default of 4. | |
The threads will listen to a threadsafe mpmc queue. Tasks will come in on the | |
queue, either an instruction to do something, or an instruction to terminate | |
immediately so the main program can join the threads and quit. | |
uses the theadpool to recurse the current directory, preferably by dispatching a single directory to a task, and then have each task also dispatch a task for any directories it finds. | |
if the task finds a file ending in ".txt" it should dispatch a new task that | |
prints the file name. | |
*/ | |
int main() { | |
try { | |
// Initialize thread pool with default number of threads (4) | |
ThreadPool thread_pool; | |
thread_pool.submit([] { | |
std::cout << "Hello world\n"; | |
}); | |
// Recursively process the current directory in a new task | |
thread_pool.submit([&] { | |
recurse_directory("/Users/nporcino/dev/openexr-images", thread_pool); | |
}); | |
// Wait for all tasks to finish before exiting | |
thread_pool.wait(); | |
} | |
catch (const std::exception& e) { | |
std::cerr << "Error: " << e.what() << std::endl; | |
return 1; | |
} | |
return 0; | |
} | |
int main2() { | |
try { | |
std::string exrinfo_output = execute_command("exrinfo somefile.exr"); | |
std::string nanoexrinfo_output = execute_command("nanoexrinfo somefile.exr"); | |
if (exrinfo_output == nanoexrinfo_output) { | |
std::cout << "Files are identical." << std::endl; | |
} else { | |
std::cout << "Files are different." << std::endl; | |
} | |
} catch (const std::exception& e) { | |
std::cerr << "Error: " << e.what() << std::endl; | |
return 1; | |
} | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment