Skip to content

Instantly share code, notes, and snippets.

@brandsimon
Last active July 12, 2020 19:46
Show Gist options
  • Save brandsimon/7d4226b652db9e8ced843a2d2cb0003b to your computer and use it in GitHub Desktop.
Save brandsimon/7d4226b652db9e8ced843a2d2cb0003b to your computer and use it in GitHub Desktop.
Async-executor for multiple threads

This is just a little cpp class to execute a function parallel with different parameters over and over again. It was developed to test some c++ features.

Compile: clang++-7 -Wall x.cpp -lpthread -std=c++17

#include <iostream>
#include <future>
#include <vector>
template <typename T>
T calc(T a, T b) {
T c = a + b;
for (T i = 0; i < 1000000; i++) {
c += b * i - 10;
}
return a + b + c;
};
template <typename Result, typename... Parameters>
class Executor {
private:
Result (*func)(Parameters...);
std::vector<std::thread> threads;
typedef std::unique_ptr<std::tuple<
std::unique_ptr<std::promise<Result>>,
std::unique_ptr<std::tuple<Parameters...>>
>> vector_type;
std::vector<vector_type> todo;
std::mutex todo_mutex;
std::condition_variable condition;
bool data_ready = false;
bool shutdown = false;
void run() {
std::cout << "run thread: " << std::this_thread::get_id() << std::endl;
while (true) {
vector_type t;
{
// Dont lock during function call
std::unique_lock<std::mutex> uk(todo_mutex);
condition.wait(uk, [this] { return data_ready; });
if (todo.empty()) {
if (shutdown) {
return;
}
data_ready = false;
continue;
}
t = std::move(todo.back());
todo.pop_back();
}
auto [prom, param] = std::move(*t);
prom->set_value(std::apply(func, *param));
};
};
std::vector<std::unique_ptr<std::promise<Result>>> promises;
public:
Executor(size_t num_threads, Result (*f)(Parameters...)): func(f) {
threads.reserve(num_threads);
for (size_t i = 0; i < num_threads; i++) {
threads.push_back(std::thread(&Executor::run, this));
}
};
~Executor() {
{
std::lock_guard<std::mutex> lk(todo_mutex);
shutdown = true;
data_ready = true;
condition.notify_all();
}
for (size_t i = 0; i < threads.size(); i++) {
threads[i].join();
}
};
std::future<Result> exec(Parameters... params) {
auto prom = std::make_unique<std::promise<Result>>();
auto res = prom->get_future();
auto save = std::make_unique<std::tuple<
std::unique_ptr<std::promise<Result>>,
std::unique_ptr<std::tuple<Parameters...>>>>(
std::make_tuple(std::move(prom),
std::make_unique<std::tuple<Parameters...>>(
std::make_tuple(params...))));
std::lock_guard<std::mutex> lk(todo_mutex);
todo.push_back(std::move(save));
data_ready = true;
condition.notify_one();
return res;
}
};
void fill(uint64_t runs, std::vector<std::future<float>>& results, Executor<float, float, float>& calcer) {
results.reserve(runs);
for (uint64_t i = 0; i < runs; i++) {
results.push_back(calcer.exec(i, runs));
}
}
int main(int argc, char *argv[]) {
Executor<float, float, float> calcer(std::thread::hardware_concurrency(), calc);
std::cout << "main thread: " << std::this_thread::get_id() << std::endl;
uint64_t runs = 10000;
std::vector<std::future<float>> results1, results2;
auto future1 = std::async([&] {
fill(runs, results1, calcer);
});
auto future2 = std::async([&] {
fill(runs, results2, calcer);
});
future1.get();
future2.get();
for (uint64_t i = 0; i < runs; i++) {
results1[i].get();
results2[i].get();
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment