Skip to content

Instantly share code, notes, and snippets.

@meshula
Last active March 15, 2023 01:44
Show Gist options
  • Save meshula/9af43ea1097881ccea5b7b9cbc9fa839 to your computer and use it in GitHub Desktop.
Save meshula/9af43ea1097881ccea5b7b9cbc9fa839 to your computer and use it in GitHub Desktop.
thread pool to test exr utilities
#include <algorithm>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <filesystem>
#include <fstream>
#include <functional>
#include <future>
#include <iostream>
#include <mutex>
#include <queue>
#include <sstream>
#include <stdexcept>
#include <string>
#include <thread>
#include <vector>
// Threadsafe Single-Producer/Multiple-Consumer Queue (SPMC)
template <typename T>
class spmc_queue {
public:
spmc_queue() {}
void push(T&& value) {
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(value));
cv_.notify_one();
}
bool try_pop(T& value) {
std::unique_lock<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
value = std::move(queue_.front());
queue_.pop();
return true;
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return !queue_.empty(); });
value = std::move(queue_.front());
queue_.pop();
}
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
};
// Worker thread function
void worker_thread(int id, spmc_queue<std::function<void()>>& task_queue, std::atomic<bool>& terminate_flag) {
while (!terminate_flag.load()) {
std::function<void()> task;
if (task_queue.try_pop(task)) {
task();
} else {
std::this_thread::yield();
}
}
}
template <typename T>
class MPMCQueue {
public:
MPMCQueue(size_t size) : buffer(size), readIndex(0), writeIndex(0) {}
bool tryPush(const T& item) {
size_t currentWriteIndex = writeIndex.load(std::memory_order_relaxed);
size_t nextWriteIndex = getNextIndex(currentWriteIndex);
if (nextWriteIndex == readIndex.load(std::memory_order_acquire)) {
return false; // queue is full
}
buffer[currentWriteIndex] = item;
writeIndex.store(nextWriteIndex, std::memory_order_release);
return true;
}
bool tryPop(T& item) {
size_t currentReadIndex = readIndex.load(std::memory_order_relaxed);
if (currentReadIndex == writeIndex.load(std::memory_order_acquire)) {
return false; // queue is empty
}
item = buffer[currentReadIndex];
readIndex.store(getNextIndex(currentReadIndex), std::memory_order_release);
return true;
}
/*
In this implementation, empty() checks whether the read index and write index
are equal, indicating that the queue is empty. It uses std::memory_order_relaxed
for the load operations on the read and write indices because we don't need
any synchronization with other threads in this case. Note that this implementation
assumes that the queue has a fixed size and does not wrap around. If the queue is
allowed to wrap around, the empty() method would need to be modified accordingly.
*/
bool empty() const {
size_t currentReadIndex = readIndex.load(std::memory_order_relaxed);
size_t currentWriteIndex = writeIndex.load(std::memory_order_relaxed);
return (currentReadIndex == currentWriteIndex);
}
private:
std::vector<T> buffer;
std::atomic<size_t> readIndex;
std::atomic<size_t> writeIndex;
size_t getNextIndex(size_t index) {
return (index + 1) % buffer.size();
}
};
const int kMaxTasks = 1000;
class ThreadPool {
public:
explicit ThreadPool(std::size_t num_threads = std::thread::hardware_concurrency()) :
queue_(kMaxTasks),
stop_(false)
{
try {
for (std::size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back(&ThreadPool::worker_thread, this);
}
} catch (...) {
stop_ = true;
throw;
}
}
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(mutex_);
stop_ = true;
}
cv_.notify_all();
for (auto& thread : threads_) {
thread.join();
}
}
void submit(std::function<void()> task) {
if (queue_.tryPush(task))
cv_.notify_one();
}
/*
In this implementation, the method acquires a lock on the mutex and then waits
on the condition variable until the queue is empty, indicating that all submitted
tasks have completed. The queue_.empty() method is used to check whether the
queue is empty. This method is safe to call from multiple threads because it
uses atomic operations to access the read and write indices. When a task is
submitted to the queue, the worker threads will decrement the count of
unfinished tasks, so the queue_.empty() method will return true when all
tasks have been completed.
*/
void wait() {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return queue_.empty(); });
}
private:
void worker_thread() {
while (!stop_) {
std::function<void()> task;
if (queue_.tryPop(task)) {
task();
} else {
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return stop_ || !queue_.empty(); });
}
}
}
MPMCQueue<std::function<void()>> queue_;
std::vector<std::thread> threads_;
std::mutex mutex_;
std::condition_variable cv_;
bool stop_;
};
void print_file_name(const std::string& file_path) {
std::cout << "Found text file: " << file_path << std::endl;
}
void recurse_directory(const std::string& directory_path, ThreadPool& thread_pool) {
for (const auto& entry : std::filesystem::directory_iterator(directory_path)) {
if (entry.is_directory()) {
//std::cout << "Recursing " << entry.path() << std::endl;
// Recursively process the subdirectory in a new task
auto path = entry.path();
thread_pool.submit([path, &thread_pool] {
recurse_directory(path.string(), thread_pool);
});
} else if (entry.is_regular_file() && entry.path().extension() == ".exr") {
std::cout << "Submitting " << entry.path() << std::endl;
// Print the file name in a new task
auto path = entry.path();
thread_pool.submit([path] {
print_file_name(path.string());
});
}
else {
//std::cout << "Skipping entry " << entry.path() << std::endl;
}
}
//std::cout << "finished recursing directory\n";
}
enum class FileComparisonResult {
FilesDiffer,
FilesIdentical
};
FileComparisonResult compareExrFiles(const std::filesystem::path& filePath) {
// Construct the shell commands
std::string exrinfoCmd = "exrinfo \"" + filePath.string() + "\"";
std::string nanoexrinfoCmd = "nanoexrinfo \"" + filePath.string() + "\"";
// Run the commands and capture the output
std::string exrinfoOutput, nanoexrinfoOutput;
FILE* exrinfoPipe = popen(exrinfoCmd.c_str(), "r");
if (!exrinfoPipe) {
throw std::runtime_error("Failed to execute exrinfo command");
}
char buffer[4096];
while (fgets(buffer, sizeof(buffer), exrinfoPipe)) {
exrinfoOutput += buffer;
}
if (pclose(exrinfoPipe) != 0) {
throw std::runtime_error("Error executing exrinfo command");
}
FILE* nanoexrinfoPipe = popen(nanoexrinfoCmd.c_str(), "r");
if (!nanoexrinfoPipe) {
throw std::runtime_error("Failed to execute nanoexrinfo command");
}
while (fgets(buffer, sizeof(buffer), nanoexrinfoPipe)) {
nanoexrinfoOutput += buffer;
}
if (pclose(nanoexrinfoPipe) != 0) {
throw std::runtime_error("Error executing nanoexrinfo command");
}
// Compare the output
if (exrinfoOutput == nanoexrinfoOutput) {
return FileComparisonResult::FilesIdentical;
} else {
return FileComparisonResult::FilesDiffer;
}
}
/*
given a std::filesystem::path, write a function that invokes the shell
command exrinfo and also the shell command nanoexrinfo. The function
captures std::out from both programs, and checks if the output is
identical. It returns an enumeration whose values are
FilesDiffer, FilesIdentical. If errors are encountered a std::exception
is thrown. The exception should include information on the error that
occurred, including filename if appropriate.
*/
#ifdef _MSC_VER
#include <io.h>
#include <fcntl.h>
/*
On Windows, popen and pclose are available through the io.h header file,
which defines _popen and _pclose functions. These functions work similarly
to the standard popen and pclose functions, except they use a different set
of pipe-related APIs that are specific to Windows.
*/
std::string execute_command(const char* cmd) {
std::string result;
char buffer[128];
// Open the command for reading
FILE* pipe = _popen(cmd, "r");
if (!pipe) {
throw std::runtime_error("Error: Failed to open pipe.");
}
// Set the pipe to be binary mode
_setmode(_fileno(pipe), _O_BINARY);
// Read pipe output into buffer
while (fgets(buffer, sizeof(buffer), pipe)) {
result += buffer;
}
// Close the pipe
int status = _pclose(pipe);
if (status < 0) {
throw std::runtime_error("Error: Failed to close pipe.");
}
return result;
}
#else
std::string execute_command(const char* cmd) {
std::string result;
char buffer[128];
// Open the command for reading
FILE* pipe = popen(cmd, "r");
if (!pipe) {
throw std::runtime_error("Error: Failed to open pipe.");
}
// Read pipe output into buffer
while (fgets(buffer, sizeof(buffer), pipe)) {
result += buffer;
}
// Close the pipe
int status = pclose(pipe);
if (status < 0) {
throw std::runtime_error("Error: Failed to close pipe.");
}
return result;
}
#endif
/*
The program is in C++. It is going to create n threads, with a default of 4.
The threads will listen to a threadsafe mpmc queue. Tasks will come in on the
queue, either an instruction to do something, or an instruction to terminate
immediately so the main program can join the threads and quit.
uses the theadpool to recurse the current directory, preferably by dispatching a single directory to a task, and then have each task also dispatch a task for any directories it finds.
if the task finds a file ending in ".txt" it should dispatch a new task that
prints the file name.
*/
int main() {
try {
// Initialize thread pool with default number of threads (4)
ThreadPool thread_pool;
thread_pool.submit([] {
std::cout << "Hello world\n";
});
// Recursively process the current directory in a new task
thread_pool.submit([&] {
recurse_directory("/Users/nporcino/dev/openexr-images", thread_pool);
});
// Wait for all tasks to finish before exiting
thread_pool.wait();
}
catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
int main2() {
try {
std::string exrinfo_output = execute_command("exrinfo somefile.exr");
std::string nanoexrinfo_output = execute_command("nanoexrinfo somefile.exr");
if (exrinfo_output == nanoexrinfo_output) {
std::cout << "Files are identical." << std::endl;
} else {
std::cout << "Files are different." << std::endl;
}
} catch (const std::exception& e) {
std::cerr << "Error: " << e.what() << std::endl;
return 1;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment