Skip to content

Instantly share code, notes, and snippets.

@mkolod
Last active December 14, 2020 17:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mkolod/bb1b14a5efb28ad39c5fd91646f32779 to your computer and use it in GitHub Desktop.
Save mkolod/bb1b14a5efb28ad39c5fd91646f32779 to your computer and use it in GitHub Desktop.
#include <chrono>
#include <cmath>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
template<typename Ret, typename Fun, typename Arg>
class ReusableWorkerThreadWithFuture {
public:
ReusableWorkerThreadWithFuture(Fun fun, bool spin = false) : fun_(fun), done(true), ready(false), spin_(spin), spin_done(true), spin_ready(false) {
t = std::move(std::thread(&ReusableWorkerThreadWithFuture::worker, this));
t.detach();
}
void submit(Arg arg) {
if (spin_) {
while (!spin_done) {}
arg_ = arg;
spin_done = false;
spin_ready = true;
} else {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&] { return done; });
arg_ = arg;
done = false;
ready = true;
lk.unlock();
cv.notify_one();
}
}
Ret get() {
// wait for done
if (spin_) {
while (!spin_done || spin_ready) {}
} else {
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [&] { return done && !ready; });
lk.unlock();
cv.notify_one();
}
return ret;
}
void worker() {
while (true) {
if (spin_) {
while (spin_done || !spin_ready) {}
ret = fun_(arg_);
spin_done = true;
spin_ready = false;
} else {
std::unique_lock<std::mutex> lk(m);
// Ready means lambda and data are provided
cv.wait(lk, [&] { return !done && ready; });
ret = fun_(arg_);
done = true;
ready = false;
lk.unlock();
cv.notify_all();
}
}
}
private:
std::thread t;
Fun fun_;
Arg arg_;
Ret ret;
std::mutex m;
std::condition_variable cv;
bool done;
bool ready;
bool spin_;
std::atomic<bool> spin_done;
std::atomic<bool> spin_ready;
};
int main() {
using namespace std::chrono;
const int num_iter = 1000;
auto fun = [](int i) {
std::this_thread::sleep_for(microseconds(1));
return i * 2;
};
const int arg = 3;
const int expected = 6;
ReusableWorkerThreadWithFuture<int, decltype(fun), int> t(fun);
ReusableWorkerThreadWithFuture<int, decltype(fun), int> t_spin(fun, true);
// Wait for the ReusableWorkerThread's internal thread to start
std::this_thread::sleep_for(seconds(1));
std::cout << "Starting worker thread." << std::endl;
auto worker_start = steady_clock::now();
int result = 0;
for (int i = 0; i < num_iter; ++i) {
result = 0;
t.submit(arg);
std::this_thread::sleep_for(microseconds(1));
result = t.get();
if (result != expected) {
throw std::runtime_error("Incorrect result from worker thread");
}
}
auto worker_end = steady_clock::now();
auto worker_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(worker_end - worker_start).count();
std::cout << "Done with worker thread. Starting worker with spin-lock." << std::endl;
auto worker_spin_start = steady_clock::now();
for (int i = 0; i < num_iter; ++i) {
result = 0;
t_spin.submit(arg);
std::this_thread::sleep_for(microseconds(1));
result = t_spin.get();
if (result != expected) {
throw std::runtime_error("Incorrect result from worker thread");
}
}
auto worker_spin_end = steady_clock::now();
auto worker_spin_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(worker_spin_end - worker_spin_start).count();
std::cout << "Done with spin-worker thread. Starting async." << std::endl;
auto async_start = steady_clock::now();
for (int i = 0; i < num_iter; ++i) {
result = 0;
auto a = std::async(std::launch::async, fun, arg);
std::this_thread::sleep_for((microseconds(1)));
result = a.get();
if (result != expected) {
throw std::runtime_error("Incorrect result from async");
}
}
auto async_end = steady_clock::now();
auto async_duration_ms = std::chrono::duration_cast<std::chrono::milliseconds>(async_end - async_start).count();
std::cout << "Done with async." << std::endl;
std::cout << "Worker duration (ms): " << worker_duration_ms << std::endl;
std::cout << "Worker with spin duration (ms): " << worker_spin_duration_ms << std::endl;
std::cout << "Async duration (ms): " << async_duration_ms << std::endl;
return 0;
}
@mkolod
Copy link
Author

mkolod commented Dec 12, 2020

Sample output:

Starting worker thread.
Done with worker thread. Starting worker with spin-lock.
Done with spin-worker thread. Starting async.
Done with async.
Worker duration (ms): 13
Worker with spin duration (ms): 7
Async duration (ms): 28

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment