Skip to content

Instantly share code, notes, and snippets.

@ugovaretto
Last active February 12, 2022 02:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ugovaretto/fb761c536a8096d3b8fff13f734654d4 to your computer and use it in GitHub Desktop.
Save ugovaretto/fb761c536a8096d3b8fff13f734654d4 to your computer and use it in GitHub Desktop.
Task executor and wrapper for concurrent access to variable
// Author: Ugo Varetto
// Implementation of task-based concurrency (similar to Java's Executor)
// and wrapper for concurrent access
// g++ task-based-executor-concurrent-generic.cpp -std=c++11 -pthread
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <cstdlib> //EXIT_*
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <map>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
//------------------------------------------------------------------------------
// synchronized queue (could be an inner class inside Executor):
// - acquire lock on insertion and notify after insertion
// - on extraction: acquire lock then if queue empty wait for notify, extract
// element
template <typename T>
class SyncQueue {
public:
void Push(const T& e) {
// simple scoped lock: acquire mutex in constructor,
// release in destructor
std::lock_guard<std::mutex> guard(mutex_);
queue_.push_front(e);
cond_.notify_one(); // notify
}
void Push(T&& e) {
std::lock_guard<std::mutex> guard(mutex_);
queue_.push_front(std::move(e));
cond_.notify_one(); // notify
}
T Pop() {
// cannot use simple scoped lock here because lock passed to
// wait must be able to acquire and release the mutex
std::unique_lock<std::mutex> lock(mutex_);
// stop and wait for notification if condition is false;
// continue otherwise
cond_.wait(lock, [this] { return !queue_.empty(); });
T e = std::move(queue_.back());
queue_.pop_back();
return e;
}
void Clear() { queue_.clear(); }
private:
std::deque<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};
//------------------------------------------------------------------------------
// interface for callable objects
struct ICaller {
virtual bool Empty() const = 0;
virtual void Invoke() = 0;
virtual ~ICaller() {}
};
// callable object stored in queue shared among threads: parameters are
// bound at object construction time
template <typename ResultType>
class Caller : public ICaller {
public:
template <typename F, typename... Args>
Caller(F&& f, Args&&... args)
: f_(std::bind(std::forward<F>(f), std::forward<Args>(args)...)),
empty_(false) {}
Caller() : empty_(true) {}
std::future<ResultType> GetFuture() { return p_.get_future(); }
void Invoke() {
try {
ResultType r = ResultType(f_());
p_.set_value(r);
} catch (...) {
p_.set_exception(std::current_exception());
}
}
bool Empty() const { return empty_; }
private:
std::promise<ResultType> p_;
std::function<ResultType()> f_;
bool empty_;
};
// specialization for void return type
template <>
class Caller<void> : public ICaller {
public:
template <typename F, typename... Args>
Caller(F f, Args... args) : f_(std::bind(f, args...)), empty_(false) {}
Caller() : empty_(true) {}
std::future<void> GetFuture() { return p_.get_future(); }
void Invoke() {
try {
f_();
p_.set_value();
} catch (...) {
p_.set_exception(std::current_exception());
}
}
bool Empty() const { return empty_; }
private:
std::promise<void> p_;
std::function<void()> f_;
bool empty_;
};
//------------------------------------------------------------------------------
// task executor: asynchronously execute callable objects. Specify the max
// number of threads to use at Executor construction time; threads are started
// in the constructor and joined in the destructor
class Executor {
typedef SyncQueue<std::unique_ptr<ICaller> > Queue;
typedef std::vector<std::thread> Threads;
public:
Executor(int numthreads = std::thread::hardware_concurrency()) {
StartThreads(numthreads);
}
#ifdef USE_RESULT_OF
// deferred call to f with args parameters
// 1. all the arguments are bound to a function object taking zero
// parameters
// which is put into the shared queue
// 2. std::future is returned
template <typename F, typename... Args>
auto operator()(F&& f, Args... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
if (threads_.empty()) throw std::logic_error("No active threads");
typedef typename std::result_of<F(Args...)>::type ResultType;
Caller<ResultType>* c = new Caller<ResultType>(
std::forward<F>(f), std::forward<Args>(args)...);
std::future<ResultType> ft = c->GetFuture();
queue_.Push(c);
return ft;
}
#else
template <typename F, typename... Args>
auto operator()(F&& f, Args... args) -> std::future<decltype(f(args...))> {
if (threads_.empty()) throw std::logic_error("No active threads");
typedef decltype(f(args...)) ResultType;
Caller<ResultType>* c = new Caller<ResultType>(
std::forward<F>(f), std::forward<Args>(args)...);
std::future<ResultType> ft = c->GetFuture();
queue_.Push(std::unique_ptr<ICaller>(c));
return ft;
}
#endif
// stop and join all threads; queue is cleared by default call with
// false to avoid clearing queue
// to "stop" threads an empty Caller instance per-thread is put into the
// queue; threads interpret an empty Caller as as stop signal and exit from
// the execution loop as soon as one is popped from the queue
// Note: this is the only safe way to terminate threads, other options like
// invoking explicit terminate functions where available are similar
// to killing a process with Ctrl-C, since however threads are not
// processes the resources allocated/acquired during the thread lifetime
// are not automatically released
void Stop(bool clearQueue = true) { // blocking
for (int t = 0; t != threads_.size(); ++t)
queue_.Push(std::unique_ptr<ICaller>(new Caller<void>));
std::for_each(threads_.begin(), threads_.end(),
[](std::thread& t) { t.join(); });
threads_.clear();
queue_.Clear();
}
// start or re-start with numthreads threads, queue is cleared by default
// call with ..., false to avoid clearing queue
void Start(int numthreads, bool clearQueue = true) { // non-blocking
if (numthreads < 1) {
throw std::range_error("Number of threads < 1");
}
Stop(clearQueue);
StartThreads(numthreads);
}
// same as Start; in case the Executor is created with zero threads
// it makes sense to call start; if it's created with a number of threads
// greater than zero call Restart in client code
void Restart(int numthreads, bool clearQueue = true) {
Start(numthreads, clearQueue);
}
// join all threads
~Executor() { Stop(); }
private:
// start threads and put them into thread vector
void StartThreads(int nthreads) {
for (int t = 0; t != nthreads; ++t) {
threads_.push_back(std::move(std::thread([this] {
while (true) {
auto c = queue_.Pop();
if (c->Empty()) { // interpret an empty Caller as a
//'terminate' message
break;
}
c->Invoke();
}
})));
}
}
private:
Queue queue_; // command queue
Threads threads_; // std::thread array; size == nthreads_
};
//------------------------------------------------------------------------------
struct VoidType {};
struct NonVoidType {};
template <typename T>
struct Void {
typedef NonVoidType type;
};
template <>
struct Void<void> {
typedef VoidType type;
};
// Safe concurrent access to data by wrapping variable with object which
// synchronizes access to resource.
// Access can only happen by passing a functor which receives a reference to
// the wrapped resource.
template <typename T>
class ConcurrentAccess {
T data_; // warning 'mutable' cannot be applied to references
bool done_ = false;
SyncQueue<std::function<void()> > queue_;
std::future<void> f_;
public:
ConcurrentAccess() = delete;
ConcurrentAccess(const ConcurrentAccess&) = delete;
ConcurrentAccess(ConcurrentAccess&&) = delete;
ConcurrentAccess(T data, Executor& e)
: data_(data), f_(e([=] {
while (!done_) queue_.Pop()();
})) {}
template <typename F>
auto operator()(F&& f) -> std::future<typename std::result_of<F(T)>::type> {
using R = typename std::result_of<F(T)>::type;
return Invoke(std::forward<F>(f), typename Void<R>::type());
}
template <typename F>
auto Invoke(F&& f, const NonVoidType&)
-> std::future<typename std::result_of<F(T)>::type> {
using R = typename std::result_of<F(T)>::type;
auto p = std::make_shared<std::promise<R> >(std::promise<R>());
auto ft = p->get_future();
queue_.Push([=]() {
try {
p->set_value(f(data_));
} catch (...) {
p->set_exception(std::current_exception());
}
});
return ft;
}
template <typename F>
std::future<void> Invoke(F&& f, const VoidType&) {
auto p = std::make_shared<std::promise<void> >(std::promise<void>());
auto ft = p->get_future();
queue_.Push([=]() {
try {
f(data_);
p->set_value();
} catch (...) {
p->set_exception(std::current_exception());
}
});
return ft;
}
~ConcurrentAccess() {
queue_.Push([=] { done_ = true; });
f_.wait();
}
};
//------------------------------------------------------------------------------
int main(int argc, char** argv) {
try {
if (argc > 1 && std::string(argv[1]) == "-h") {
std::cout << argv[0] << "[task sleep time (ms)] "
<< "[number of tasks] "
<< "[number of threads]\n"
<< "default is (0,20,4)\n";
return 0;
}
const int sleeptime_ms = argc > 1 ? atoi(argv[1]) : 0;
const int numtasks = argc > 2 ? atoi(argv[2]) : 4;
const int numthreads = argc > 3 ? atoi(argv[3]) : 2;
std::cout << "Running tasks...\n";
std::cout << "Run-time configuration:\n"
<< " " << numtasks << " tasks\n"
<< " " << numthreads << " threads\n"
<< " " << sleeptime_ms << " ms task sleep time\n"
<< std::endl;
using namespace std;
Executor exec(numthreads);
Executor exec_aux(1);
string msg = "start\n";
// if single threaded use a different executor for concurrent access
// if not it deadlocks
ConcurrentAccess<string&> text(msg, numthreads > 1 ? exec : exec_aux);
vector<future<void> > v;
cout << this_thread::get_id() << endl;
for (int i = 0; i != numtasks; ++i)
v.push_back(exec([&, i] {
const thread::id calling_thread = this_thread::get_id();
std::this_thread::sleep_for(
std::chrono::milliseconds(sleeptime_ms));
text([=](string& s) {
s += to_string(i) + " " + to_string(i);
s += "\n";
});
text([](const string& s) { cout << s; });
}));
for (auto& f : v) f.wait();
std::cout << "Done\n";
return 0;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
return EXIT_FAILURE;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment