Last active
February 12, 2022 02:54
-
-
Save ugovaretto/fb761c536a8096d3b8fff13f734654d4 to your computer and use it in GitHub Desktop.
Task executor and wrapper for concurrent access to variable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Author: Ugo Varetto | |
// Implementation of task-based concurrency (similar to Java's Executor) | |
// and wrapper for concurrent access | |
// g++ task-based-executor-concurrent-generic.cpp -std=c++11 -pthread | |
#include <algorithm> | |
#include <chrono> | |
#include <condition_variable> | |
#include <cstdlib> //EXIT_* | |
#include <deque> | |
#include <functional> | |
#include <future> | |
#include <iostream> | |
#include <map> | |
#include <memory> | |
#include <sstream> | |
#include <stdexcept> | |
#include <thread> | |
#include <type_traits> | |
#include <utility> | |
#include <vector> | |
//------------------------------------------------------------------------------ | |
// synchronized queue (could be an inner class inside Executor): | |
// - acquire lock on insertion and notify after insertion | |
// - on extraction: acquire lock then if queue empty wait for notify, extract | |
// element | |
template <typename T> | |
class SyncQueue { | |
public: | |
void Push(const T& e) { | |
// simple scoped lock: acquire mutex in constructor, | |
// release in destructor | |
std::lock_guard<std::mutex> guard(mutex_); | |
queue_.push_front(e); | |
cond_.notify_one(); // notify | |
} | |
void Push(T&& e) { | |
std::lock_guard<std::mutex> guard(mutex_); | |
queue_.push_front(std::move(e)); | |
cond_.notify_one(); // notify | |
} | |
T Pop() { | |
// cannot use simple scoped lock here because lock passed to | |
// wait must be able to acquire and release the mutex | |
std::unique_lock<std::mutex> lock(mutex_); | |
// stop and wait for notification if condition is false; | |
// continue otherwise | |
cond_.wait(lock, [this] { return !queue_.empty(); }); | |
T e = std::move(queue_.back()); | |
queue_.pop_back(); | |
return e; | |
} | |
void Clear() { queue_.clear(); } | |
private: | |
std::deque<T> queue_; | |
std::mutex mutex_; | |
std::condition_variable cond_; | |
}; | |
//------------------------------------------------------------------------------ | |
// interface for callable objects | |
struct ICaller { | |
virtual bool Empty() const = 0; | |
virtual void Invoke() = 0; | |
virtual ~ICaller() {} | |
}; | |
// callable object stored in queue shared among threads: parameters are | |
// bound at object construction time | |
template <typename ResultType> | |
class Caller : public ICaller { | |
public: | |
template <typename F, typename... Args> | |
Caller(F&& f, Args&&... args) | |
: f_(std::bind(std::forward<F>(f), std::forward<Args>(args)...)), | |
empty_(false) {} | |
Caller() : empty_(true) {} | |
std::future<ResultType> GetFuture() { return p_.get_future(); } | |
void Invoke() { | |
try { | |
ResultType r = ResultType(f_()); | |
p_.set_value(r); | |
} catch (...) { | |
p_.set_exception(std::current_exception()); | |
} | |
} | |
bool Empty() const { return empty_; } | |
private: | |
std::promise<ResultType> p_; | |
std::function<ResultType()> f_; | |
bool empty_; | |
}; | |
// specialization for void return type | |
template <> | |
class Caller<void> : public ICaller { | |
public: | |
template <typename F, typename... Args> | |
Caller(F f, Args... args) : f_(std::bind(f, args...)), empty_(false) {} | |
Caller() : empty_(true) {} | |
std::future<void> GetFuture() { return p_.get_future(); } | |
void Invoke() { | |
try { | |
f_(); | |
p_.set_value(); | |
} catch (...) { | |
p_.set_exception(std::current_exception()); | |
} | |
} | |
bool Empty() const { return empty_; } | |
private: | |
std::promise<void> p_; | |
std::function<void()> f_; | |
bool empty_; | |
}; | |
//------------------------------------------------------------------------------ | |
// task executor: asynchronously execute callable objects. Specify the max | |
// number of threads to use at Executor construction time; threads are started | |
// in the constructor and joined in the destructor | |
class Executor { | |
typedef SyncQueue<std::unique_ptr<ICaller> > Queue; | |
typedef std::vector<std::thread> Threads; | |
public: | |
Executor(int numthreads = std::thread::hardware_concurrency()) { | |
StartThreads(numthreads); | |
} | |
#ifdef USE_RESULT_OF | |
// deferred call to f with args parameters | |
// 1. all the arguments are bound to a function object taking zero | |
// parameters | |
// which is put into the shared queue | |
// 2. std::future is returned | |
template <typename F, typename... Args> | |
auto operator()(F&& f, Args... args) | |
-> std::future<typename std::result_of<F(Args...)>::type> { | |
if (threads_.empty()) throw std::logic_error("No active threads"); | |
typedef typename std::result_of<F(Args...)>::type ResultType; | |
Caller<ResultType>* c = new Caller<ResultType>( | |
std::forward<F>(f), std::forward<Args>(args)...); | |
std::future<ResultType> ft = c->GetFuture(); | |
queue_.Push(c); | |
return ft; | |
} | |
#else | |
template <typename F, typename... Args> | |
auto operator()(F&& f, Args... args) -> std::future<decltype(f(args...))> { | |
if (threads_.empty()) throw std::logic_error("No active threads"); | |
typedef decltype(f(args...)) ResultType; | |
Caller<ResultType>* c = new Caller<ResultType>( | |
std::forward<F>(f), std::forward<Args>(args)...); | |
std::future<ResultType> ft = c->GetFuture(); | |
queue_.Push(std::unique_ptr<ICaller>(c)); | |
return ft; | |
} | |
#endif | |
// stop and join all threads; queue is cleared by default call with | |
// false to avoid clearing queue | |
// to "stop" threads an empty Caller instance per-thread is put into the | |
// queue; threads interpret an empty Caller as as stop signal and exit from | |
// the execution loop as soon as one is popped from the queue | |
// Note: this is the only safe way to terminate threads, other options like | |
// invoking explicit terminate functions where available are similar | |
// to killing a process with Ctrl-C, since however threads are not | |
// processes the resources allocated/acquired during the thread lifetime | |
// are not automatically released | |
void Stop(bool clearQueue = true) { // blocking | |
for (int t = 0; t != threads_.size(); ++t) | |
queue_.Push(std::unique_ptr<ICaller>(new Caller<void>)); | |
std::for_each(threads_.begin(), threads_.end(), | |
[](std::thread& t) { t.join(); }); | |
threads_.clear(); | |
queue_.Clear(); | |
} | |
// start or re-start with numthreads threads, queue is cleared by default | |
// call with ..., false to avoid clearing queue | |
void Start(int numthreads, bool clearQueue = true) { // non-blocking | |
if (numthreads < 1) { | |
throw std::range_error("Number of threads < 1"); | |
} | |
Stop(clearQueue); | |
StartThreads(numthreads); | |
} | |
// same as Start; in case the Executor is created with zero threads | |
// it makes sense to call start; if it's created with a number of threads | |
// greater than zero call Restart in client code | |
void Restart(int numthreads, bool clearQueue = true) { | |
Start(numthreads, clearQueue); | |
} | |
// join all threads | |
~Executor() { Stop(); } | |
private: | |
// start threads and put them into thread vector | |
void StartThreads(int nthreads) { | |
for (int t = 0; t != nthreads; ++t) { | |
threads_.push_back(std::move(std::thread([this] { | |
while (true) { | |
auto c = queue_.Pop(); | |
if (c->Empty()) { // interpret an empty Caller as a | |
//'terminate' message | |
break; | |
} | |
c->Invoke(); | |
} | |
}))); | |
} | |
} | |
private: | |
Queue queue_; // command queue | |
Threads threads_; // std::thread array; size == nthreads_ | |
}; | |
//------------------------------------------------------------------------------ | |
struct VoidType {}; | |
struct NonVoidType {}; | |
template <typename T> | |
struct Void { | |
typedef NonVoidType type; | |
}; | |
template <> | |
struct Void<void> { | |
typedef VoidType type; | |
}; | |
// Safe concurrent access to data by wrapping variable with object which | |
// synchronizes access to resource. | |
// Access can only happen by passing a functor which receives a reference to | |
// the wrapped resource. | |
template <typename T> | |
class ConcurrentAccess { | |
T data_; // warning 'mutable' cannot be applied to references | |
bool done_ = false; | |
SyncQueue<std::function<void()> > queue_; | |
std::future<void> f_; | |
public: | |
ConcurrentAccess() = delete; | |
ConcurrentAccess(const ConcurrentAccess&) = delete; | |
ConcurrentAccess(ConcurrentAccess&&) = delete; | |
ConcurrentAccess(T data, Executor& e) | |
: data_(data), f_(e([=] { | |
while (!done_) queue_.Pop()(); | |
})) {} | |
template <typename F> | |
auto operator()(F&& f) -> std::future<typename std::result_of<F(T)>::type> { | |
using R = typename std::result_of<F(T)>::type; | |
return Invoke(std::forward<F>(f), typename Void<R>::type()); | |
} | |
template <typename F> | |
auto Invoke(F&& f, const NonVoidType&) | |
-> std::future<typename std::result_of<F(T)>::type> { | |
using R = typename std::result_of<F(T)>::type; | |
auto p = std::make_shared<std::promise<R> >(std::promise<R>()); | |
auto ft = p->get_future(); | |
queue_.Push([=]() { | |
try { | |
p->set_value(f(data_)); | |
} catch (...) { | |
p->set_exception(std::current_exception()); | |
} | |
}); | |
return ft; | |
} | |
template <typename F> | |
std::future<void> Invoke(F&& f, const VoidType&) { | |
auto p = std::make_shared<std::promise<void> >(std::promise<void>()); | |
auto ft = p->get_future(); | |
queue_.Push([=]() { | |
try { | |
f(data_); | |
p->set_value(); | |
} catch (...) { | |
p->set_exception(std::current_exception()); | |
} | |
}); | |
return ft; | |
} | |
~ConcurrentAccess() { | |
queue_.Push([=] { done_ = true; }); | |
f_.wait(); | |
} | |
}; | |
//------------------------------------------------------------------------------ | |
int main(int argc, char** argv) { | |
try { | |
if (argc > 1 && std::string(argv[1]) == "-h") { | |
std::cout << argv[0] << "[task sleep time (ms)] " | |
<< "[number of tasks] " | |
<< "[number of threads]\n" | |
<< "default is (0,20,4)\n"; | |
return 0; | |
} | |
const int sleeptime_ms = argc > 1 ? atoi(argv[1]) : 0; | |
const int numtasks = argc > 2 ? atoi(argv[2]) : 4; | |
const int numthreads = argc > 3 ? atoi(argv[3]) : 2; | |
std::cout << "Running tasks...\n"; | |
std::cout << "Run-time configuration:\n" | |
<< " " << numtasks << " tasks\n" | |
<< " " << numthreads << " threads\n" | |
<< " " << sleeptime_ms << " ms task sleep time\n" | |
<< std::endl; | |
using namespace std; | |
Executor exec(numthreads); | |
Executor exec_aux(1); | |
string msg = "start\n"; | |
// if single threaded use a different executor for concurrent access | |
// if not it deadlocks | |
ConcurrentAccess<string&> text(msg, numthreads > 1 ? exec : exec_aux); | |
vector<future<void> > v; | |
cout << this_thread::get_id() << endl; | |
for (int i = 0; i != numtasks; ++i) | |
v.push_back(exec([&, i] { | |
const thread::id calling_thread = this_thread::get_id(); | |
std::this_thread::sleep_for( | |
std::chrono::milliseconds(sleeptime_ms)); | |
text([=](string& s) { | |
s += to_string(i) + " " + to_string(i); | |
s += "\n"; | |
}); | |
text([](const string& s) { cout << s; }); | |
})); | |
for (auto& f : v) f.wait(); | |
std::cout << "Done\n"; | |
return 0; | |
} catch (const std::exception& e) { | |
std::cerr << e.what() << std::endl; | |
return EXIT_FAILURE; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment