Created
May 1, 2014 19:51
-
-
Save tomaka/59dddef9f91ba6c9acb5 to your computer and use it in GitHub Desktop.
Class which allows parallel execution in a separate thread
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "ThreadedCommandsQueue.hpp" | |
#include <future> | |
ThreadedCommandsQueue::ThreadedCommandsQueue() | |
{ | |
std::promise<void> initializationFinished; | |
auto future = initializationFinished.get_future(); | |
mExecutingThread = std::thread([this,&initializationFinished]() { | |
try { | |
threadFunction(initializationFinished); | |
} catch(...) { | |
assert(false); | |
std::clog << "Unexpected exception in threaded commands queue" << std::endl; | |
throw; | |
} | |
}); | |
future.get(); | |
} | |
ThreadedCommandsQueue::~ThreadedCommandsQueue() { | |
if (!mExecutingThread.joinable()) | |
return; | |
{ std::unique_lock<std::mutex> lock(*mQueueAccessMutex); | |
*mExecutingThreadMustStop = true; | |
mExecutingThreadSemaphore->notify_all(); | |
} | |
mExecutingThread.join(); | |
} | |
void ThreadedCommandsQueue::threadFunction(std::promise<void>& initializationFinished) { | |
// this is the function that is executed in parallel | |
// creating all variables | |
std::atomic<bool> executingThreadMustStop; | |
std::condition_variable_any executingThreadSemaphore; | |
std::vector<std::unique_ptr<TaskInterface>> queue; | |
std::mutex queueAccessMutex; | |
executingThreadMustStop = false; | |
// storing pointers in "this" | |
mExecutingThreadMustStop = &executingThreadMustStop; | |
mExecutingThreadSemaphore = &executingThreadSemaphore; | |
mQueue = &queue; | |
mQueueAccessMutex = &queueAccessMutex; | |
// | |
initializationFinished.set_value(); | |
// we use a local copy of the _queue | |
// we are going to do this: swap the local queue and the main _queue, execute the local queue, swap again, execute again, etc. | |
decltype(queue) localQueue; | |
do { | |
// executing the content of the localQueue | |
for (auto& elem : localQueue) | |
elem->execute(); | |
localQueue.clear(); | |
// filling the queue again | |
{ std::unique_lock<std::mutex> lock(queueAccessMutex); | |
if (queue.empty() && !executingThreadMustStop) { | |
executingThreadSemaphore.wait(lock); | |
assert(!queue.empty() || executingThreadMustStop); // asserting that the condition_variable was not notified for no reason | |
} | |
localQueue.swap(queue); | |
} | |
} while (!localQueue.empty() || !executingThreadMustStop); | |
assert(localQueue.empty()); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef INCLUDE_THREADEDCOMMANDSQUEUE_HPP | |
#define INCLUDE_THREADEDCOMMANDSQUEUE_HPP | |
#include <atomic> | |
#include <cassert> | |
#include <functional> | |
#include <future> | |
#include <iostream> | |
#include <memory> | |
#include <thread> | |
#include <vector> | |
/** | |
* This object starts a thread and allows you to execute tasks in it. | |
*/ | |
class ThreadedCommandsQueue { | |
public: | |
/** | |
* | |
*/ | |
explicit ThreadedCommandsQueue(); | |
/** | |
* Move constructor | |
*/ | |
ThreadedCommandsQueue(ThreadedCommandsQueue&& other) : | |
mExecutingThread(std::move(other.mExecutingThread)), | |
mExecutingThreadMustStop(other.mExecutingThreadMustStop), | |
mExecutingThreadSemaphore(other.mExecutingThreadSemaphore), | |
mQueue(other.mQueue), | |
mQueueAccessMutex(other.mQueueAccessMutex) | |
{ | |
} | |
/** | |
* Destructor | |
* Waits for all the tasks to finish executing, then closes the thread | |
*/ | |
~ThreadedCommandsQueue(); | |
/** | |
* Copy is forbidden | |
*/ | |
ThreadedCommandsQueue(const ThreadedCommandsQueue&) = delete; | |
/** | |
* Copy is forbidden | |
*/ | |
ThreadedCommandsQueue& operator=(const ThreadedCommandsQueue&) = delete; | |
/** | |
* Adds a task to the queue | |
* @param fn A function object or function pointer/reference ; must be callable without argument | |
* @return Returns a std::future which you can use to determine when the task is over | |
*/ | |
template<typename TFunctionObject> | |
auto addToQueue(TFunctionObject&& fn) | |
-> std::future<decltype(fn())> | |
{ | |
return addToQueueImpl<decltype(fn())>(std::forward<TFunctionObject>(fn)); | |
} | |
/** | |
* Adds a task to the queue, without a promise/future | |
* If the task throws an exception, it will be silently dismissed | |
* @sa addToQueue | |
*/ | |
template<typename TFunctionObject> | |
void addToQueueSimple(TFunctionObject&& fn) | |
{ | |
//static_assert(noexcept(fn()), "Task passed to addToQueueSimple must not throw an exception"); | |
addToQueueSimpleImpl(std::forward<TFunctionObject>(fn)); | |
} | |
private: | |
struct TaskInterface; | |
// everything except the thread are actually local variables on the thread's stack | |
// since the variables must be accessible from both this class and from inside the thread, having the actual variables stored by the thread makes this class movable | |
std::thread mExecutingThread; // thread that executes threadFunction | |
std::atomic<bool>* mExecutingThreadMustStop = nullptr; // true means the thread must stop at its next loop | |
std::condition_variable_any* mExecutingThreadSemaphore = nullptr; // thanks to this, our mExecutingThread will sleep whenever the mQueue is empty | |
std::vector<std::unique_ptr<TaskInterface>>* mQueue = nullptr; // list of the tasks to execute | |
std::mutex* mQueueAccessMutex = nullptr; // must be locked before accessing mQueue | |
// this is the function that is executed by mExecutingThread | |
// the function will start by initializing all member variables (except the thread of course) and call initializationFinished.set_value() | |
void threadFunction(std::promise<void>& initializationFinished); | |
// interface for a task in the queue | |
struct TaskInterface { | |
virtual ~TaskInterface() {} | |
virtual void execute() const noexcept = 0; // executes the task and updates the promise | |
}; | |
// implementation of the TaskInterface with promise/future | |
template<typename TFunctionObject, typename TReturnType> | |
struct TaskImplementation : TaskInterface { | |
explicit TaskImplementation(std::promise<TReturnType>&& p, const TFunctionObject& f) : mFunction(f), mPromise(std::move(p)) {} | |
void execute() const noexcept try { mPromise.set_value(mFunction()); } catch(...) { mPromise.set_exception(std::current_exception()); } | |
private: | |
TFunctionObject mFunction; | |
mutable std::promise<TReturnType> mPromise; | |
}; | |
// template specialization of TaskImplementation for void return type | |
// mPromise.set_value takes no parameter in this case, that's why we have to adapt | |
template<typename TFunctionObject> | |
struct TaskImplementation<TFunctionObject,void> : TaskInterface { | |
explicit TaskImplementation(std::promise<void>&& p, const TFunctionObject& f) : mFunction(f), mPromise(std::move(p)) {} | |
void execute() const noexcept try { mFunction(); mPromise.set_value(); } catch(...) { mPromise.set_exception(std::current_exception()); } | |
private: | |
TFunctionObject mFunction; | |
mutable std::promise<void> mPromise; | |
}; | |
// implementation of TaskInterface without promise/future | |
template<typename TFunctionObject> | |
struct TaskImplementationSimple : TaskInterface { | |
explicit TaskImplementationSimple(const TFunctionObject& f) : mFunction(f) {} | |
void execute() const noexcept try { | |
mFunction(); | |
} catch(const std::exception& e) { | |
std::clog << "Exception when executing a threaded task" << '\n' << e.what() << std::endl; | |
} catch(...) { std::clog << "Uncaught exception when executing a threaded task" << std::endl; | |
} | |
private: | |
TFunctionObject mFunction; | |
}; | |
// implementation of the addToQueue function | |
template<typename TReturnType, typename TFunctionObject> | |
std::future<TReturnType> addToQueueImpl(TFunctionObject&& fn) { | |
assert(!*mExecutingThreadMustStop); // if mExecutingThreadMustStop is true, that means the destructor has been called | |
// building the promise and future ; both are associated to the same "asynchronous state" | |
std::promise<TReturnType> promise; | |
auto future = promise.get_future(); | |
// building a TaskImplementation object and adding it to the queue | |
// our "promise" variable is std::move'd | |
{ std::lock_guard<std::mutex> lock(*mQueueAccessMutex); | |
typedef TaskImplementation<typename std::decay<TFunctionObject>::type,TReturnType> | |
Task; | |
mQueue->push_back(std::make_unique<Task>(std::move(promise), std::move(fn))); | |
// if our thread is waiting for the queue to fill, we awake it | |
mExecutingThreadSemaphore->notify_all(); | |
} | |
return std::move(future); | |
} | |
// implementation of the addToQueueSimple function | |
template<typename TFunctionObject> | |
void addToQueueSimpleImpl(TFunctionObject&& fn) { | |
assert(!*mExecutingThreadMustStop); // if mExecutingThreadMustStop is true, that means the destructor has been called | |
// building a TaskImplementationSimple object and adding it to the queue | |
{ std::lock_guard<std::mutex> lock(*mQueueAccessMutex); | |
typedef TaskImplementationSimple<typename std::decay<TFunctionObject>::type> | |
Task; | |
mQueue->push_back(std::make_unique<Task>(std::move(fn))); | |
// if our thread is waiting for the queue to fill, we awake it | |
mExecutingThreadSemaphore->notify_all(); | |
} | |
} | |
}; | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment