This is just a little cpp class to execute a function parallel with different parameters over and over again. It was developed to test some c++ features.
Compile: clang++-7 -Wall x.cpp -lpthread -std=c++17
#include <iostream> | |
#include <future> | |
#include <vector> | |
template <typename T> | |
T calc(T a, T b) { | |
T c = a + b; | |
for (T i = 0; i < 1000000; i++) { | |
c += b * i - 10; | |
} | |
return a + b + c; | |
}; | |
template <typename Result, typename... Parameters> | |
class Executor { | |
private: | |
Result (*func)(Parameters...); | |
std::vector<std::thread> threads; | |
typedef std::unique_ptr<std::tuple< | |
std::unique_ptr<std::promise<Result>>, | |
std::unique_ptr<std::tuple<Parameters...>> | |
>> vector_type; | |
std::vector<vector_type> todo; | |
std::mutex todo_mutex; | |
std::condition_variable condition; | |
bool data_ready = false; | |
bool shutdown = false; | |
void run() { | |
std::cout << "run thread: " << std::this_thread::get_id() << std::endl; | |
while (true) { | |
vector_type t; | |
{ | |
// Dont lock during function call | |
std::unique_lock<std::mutex> uk(todo_mutex); | |
condition.wait(uk, [this] { return data_ready; }); | |
if (todo.empty()) { | |
if (shutdown) { | |
return; | |
} | |
data_ready = false; | |
continue; | |
} | |
t = std::move(todo.back()); | |
todo.pop_back(); | |
} | |
auto [prom, param] = std::move(*t); | |
prom->set_value(std::apply(func, *param)); | |
}; | |
}; | |
std::vector<std::unique_ptr<std::promise<Result>>> promises; | |
public: | |
Executor(size_t num_threads, Result (*f)(Parameters...)): func(f) { | |
threads.reserve(num_threads); | |
for (size_t i = 0; i < num_threads; i++) { | |
threads.push_back(std::thread(&Executor::run, this)); | |
} | |
}; | |
~Executor() { | |
{ | |
std::lock_guard<std::mutex> lk(todo_mutex); | |
shutdown = true; | |
data_ready = true; | |
condition.notify_all(); | |
} | |
for (size_t i = 0; i < threads.size(); i++) { | |
threads[i].join(); | |
} | |
}; | |
std::future<Result> exec(Parameters... params) { | |
auto prom = std::make_unique<std::promise<Result>>(); | |
auto res = prom->get_future(); | |
auto save = std::make_unique<std::tuple< | |
std::unique_ptr<std::promise<Result>>, | |
std::unique_ptr<std::tuple<Parameters...>>>>( | |
std::make_tuple(std::move(prom), | |
std::make_unique<std::tuple<Parameters...>>( | |
std::make_tuple(params...)))); | |
std::lock_guard<std::mutex> lk(todo_mutex); | |
todo.push_back(std::move(save)); | |
data_ready = true; | |
condition.notify_one(); | |
return res; | |
} | |
}; | |
void fill(uint64_t runs, std::vector<std::future<float>>& results, Executor<float, float, float>& calcer) { | |
results.reserve(runs); | |
for (uint64_t i = 0; i < runs; i++) { | |
results.push_back(calcer.exec(i, runs)); | |
} | |
} | |
int main(int argc, char *argv[]) { | |
Executor<float, float, float> calcer(std::thread::hardware_concurrency(), calc); | |
std::cout << "main thread: " << std::this_thread::get_id() << std::endl; | |
uint64_t runs = 10000; | |
std::vector<std::future<float>> results1, results2; | |
auto future1 = std::async([&] { | |
fill(runs, results1, calcer); | |
}); | |
auto future2 = std::async([&] { | |
fill(runs, results2, calcer); | |
}); | |
future1.get(); | |
future2.get(); | |
for (uint64_t i = 0; i < runs; i++) { | |
results1[i].get(); | |
results2[i].get(); | |
} | |
return 0; | |
} |