Skip to content

Instantly share code, notes, and snippets.

@n7275
Last active February 26, 2022 18:38
Show Gist options
  • Save n7275/c6e84964fc44006576c368795943ca96 to your computer and use it in GitHub Desktop.
Save n7275/c6e84964fc44006576c368795943ca96 to your computer and use it in GitHub Desktop.
Better (but still broken) Thread Pool
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <functional>
#include <cmath>
//simple test function to give the CPU something to do
void JobFunction(double dt) {
double dtTemp = 0;
for (unsigned long long int i = 0; i < 5000000; i++) {
dtTemp += cos((double)rand())/100000000.0;
}
//std::thread::id threadID = std::this_thread::get_id();
//std::cout << threadID << " " << dtTemp << std::endl; //endl should flush
}
class ThreadPool {
public:
ThreadPool();
~ThreadPool();
void StartWork(const double Setdt, const std::vector<std::function<void(double)>> SetQueue);
private:
double dt;
std::atomic<bool> working;
std::atomic<bool> terminate;
std::vector<std::thread> workerPool;
std::vector<std::function<void(double)>> queue;
std::condition_variable cv;
std::mutex readQueueLock;
std::mutex runningLock;
void workerThreadFunction();
};
//test functionality
int main() {
ThreadPool pool; //create our threadpool
std::vector < std::function<void(double)>> tempQueue; //vector of each function that will need to be called to.
//this would be done once when the linked-lists are created in SPSDK
for (int i = 0; i < 500; i++) {
tempQueue.push_back(JobFunction); //hSystemRefreshQueue.push_back(runner->refresh)
}
//launch some jobs
pool.StartWork(2.0, tempQueue); //replace hSystem->refresh(tfactor) with: pool.StartWork(tfactor, hSystemRefreshQueue);
std::cin.get();
pool.StartWork(1.0, tempQueue);
std::cin.get();
return 0;
}
//create the threadpool and start it
ThreadPool::ThreadPool()
{
dt = 0;
working = false;
terminate = false;
int numThreads = std::thread::hardware_concurrency();
for(int i = 0; i < numThreads; i++){
workerPool.push_back(std::thread(&ThreadPool::workerThreadFunction, this));
workerPool[i].detach();
}
}
ThreadPool::~ThreadPool()
{
terminate = true;
}
//copies a predetermined queue vector of jobs to be run
void ThreadPool::StartWork(const double Setdt, const std::vector<std::function<void(double)>> SetQueue) {
{
std::unique_lock<std::mutex> lock(readQueueLock);
dt = Setdt;
queue = SetQueue;
working = true;
}
std::unique_lock<std::mutex> RunningLock(runningLock);
cv.notify_all();
cv.wait(RunningLock, [this]() {return (!working); });
std::cout << "Done Work!" << std::endl;
}
//always runs waiting for work
void ThreadPool::workerThreadFunction()
{
std::function<void(double)> Job;
while (!terminate) {
{
std::unique_lock<std::mutex> RunningLock(runningLock);
cv.wait(RunningLock, [this]() {working = false; return !queue.empty() || terminate; });
}
{
std::unique_lock<std::mutex> lock(readQueueLock);
if (!queue.empty()) {
Job = queue.back();
queue.pop_back();
}
else {
working = false;
cv.notify_all();
continue;
}
}
Job(dt);
}
return;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment