Skip to content

Instantly share code, notes, and snippets.

@tomaka
Created May 1, 2014 19:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomaka/59dddef9f91ba6c9acb5 to your computer and use it in GitHub Desktop.
Save tomaka/59dddef9f91ba6c9acb5 to your computer and use it in GitHub Desktop.
Class which allows parallel execution in a separate thread
#include "ThreadedCommandsQueue.hpp"
#include <future>
ThreadedCommandsQueue::ThreadedCommandsQueue()
{
std::promise<void> initializationFinished;
auto future = initializationFinished.get_future();
mExecutingThread = std::thread([this,&initializationFinished]() {
try {
threadFunction(initializationFinished);
} catch(...) {
assert(false);
std::clog << "Unexpected exception in threaded commands queue" << std::endl;
throw;
}
});
future.get();
}
ThreadedCommandsQueue::~ThreadedCommandsQueue() {
if (!mExecutingThread.joinable())
return;
{ std::unique_lock<std::mutex> lock(*mQueueAccessMutex);
*mExecutingThreadMustStop = true;
mExecutingThreadSemaphore->notify_all();
}
mExecutingThread.join();
}
void ThreadedCommandsQueue::threadFunction(std::promise<void>& initializationFinished) {
// this is the function that is executed in parallel
// creating all variables
std::atomic<bool> executingThreadMustStop;
std::condition_variable_any executingThreadSemaphore;
std::vector<std::unique_ptr<TaskInterface>> queue;
std::mutex queueAccessMutex;
executingThreadMustStop = false;
// storing pointers in "this"
mExecutingThreadMustStop = &executingThreadMustStop;
mExecutingThreadSemaphore = &executingThreadSemaphore;
mQueue = &queue;
mQueueAccessMutex = &queueAccessMutex;
//
initializationFinished.set_value();
// we use a local copy of the _queue
// we are going to do this: swap the local queue and the main _queue, execute the local queue, swap again, execute again, etc.
decltype(queue) localQueue;
do {
// executing the content of the localQueue
for (auto& elem : localQueue)
elem->execute();
localQueue.clear();
// filling the queue again
{ std::unique_lock<std::mutex> lock(queueAccessMutex);
if (queue.empty() && !executingThreadMustStop) {
executingThreadSemaphore.wait(lock);
assert(!queue.empty() || executingThreadMustStop); // asserting that the condition_variable was not notified for no reason
}
localQueue.swap(queue);
}
} while (!localQueue.empty() || !executingThreadMustStop);
assert(localQueue.empty());
}
#ifndef INCLUDE_THREADEDCOMMANDSQUEUE_HPP
#define INCLUDE_THREADEDCOMMANDSQUEUE_HPP
#include <atomic>
#include <cassert>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <thread>
#include <vector>
/**
* This object starts a thread and allows you to execute tasks in it.
*/
class ThreadedCommandsQueue {
public:
/**
*
*/
explicit ThreadedCommandsQueue();
/**
* Move constructor
*/
ThreadedCommandsQueue(ThreadedCommandsQueue&& other) :
mExecutingThread(std::move(other.mExecutingThread)),
mExecutingThreadMustStop(other.mExecutingThreadMustStop),
mExecutingThreadSemaphore(other.mExecutingThreadSemaphore),
mQueue(other.mQueue),
mQueueAccessMutex(other.mQueueAccessMutex)
{
}
/**
* Destructor
* Waits for all the tasks to finish executing, then closes the thread
*/
~ThreadedCommandsQueue();
/**
* Copy is forbidden
*/
ThreadedCommandsQueue(const ThreadedCommandsQueue&) = delete;
/**
* Copy is forbidden
*/
ThreadedCommandsQueue& operator=(const ThreadedCommandsQueue&) = delete;
/**
* Adds a task to the queue
* @param fn A function object or function pointer/reference ; must be callable without argument
* @return Returns a std::future which you can use to determine when the task is over
*/
template<typename TFunctionObject>
auto addToQueue(TFunctionObject&& fn)
-> std::future<decltype(fn())>
{
return addToQueueImpl<decltype(fn())>(std::forward<TFunctionObject>(fn));
}
/**
* Adds a task to the queue, without a promise/future
* If the task throws an exception, it will be silently dismissed
* @sa addToQueue
*/
template<typename TFunctionObject>
void addToQueueSimple(TFunctionObject&& fn)
{
//static_assert(noexcept(fn()), "Task passed to addToQueueSimple must not throw an exception");
addToQueueSimpleImpl(std::forward<TFunctionObject>(fn));
}
private:
struct TaskInterface;
// everything except the thread are actually local variables on the thread's stack
// since the variables must be accessible from both this class and from inside the thread, having the actual variables stored by the thread makes this class movable
std::thread mExecutingThread; // thread that executes threadFunction
std::atomic<bool>* mExecutingThreadMustStop = nullptr; // true means the thread must stop at its next loop
std::condition_variable_any* mExecutingThreadSemaphore = nullptr; // thanks to this, our mExecutingThread will sleep whenever the mQueue is empty
std::vector<std::unique_ptr<TaskInterface>>* mQueue = nullptr; // list of the tasks to execute
std::mutex* mQueueAccessMutex = nullptr; // must be locked before accessing mQueue
// this is the function that is executed by mExecutingThread
// the function will start by initializing all member variables (except the thread of course) and call initializationFinished.set_value()
void threadFunction(std::promise<void>& initializationFinished);
// interface for a task in the queue
struct TaskInterface {
virtual ~TaskInterface() {}
virtual void execute() const noexcept = 0; // executes the task and updates the promise
};
// implementation of the TaskInterface with promise/future
template<typename TFunctionObject, typename TReturnType>
struct TaskImplementation : TaskInterface {
explicit TaskImplementation(std::promise<TReturnType>&& p, const TFunctionObject& f) : mFunction(f), mPromise(std::move(p)) {}
void execute() const noexcept try { mPromise.set_value(mFunction()); } catch(...) { mPromise.set_exception(std::current_exception()); }
private:
TFunctionObject mFunction;
mutable std::promise<TReturnType> mPromise;
};
// template specialization of TaskImplementation for void return type
// mPromise.set_value takes no parameter in this case, that's why we have to adapt
template<typename TFunctionObject>
struct TaskImplementation<TFunctionObject,void> : TaskInterface {
explicit TaskImplementation(std::promise<void>&& p, const TFunctionObject& f) : mFunction(f), mPromise(std::move(p)) {}
void execute() const noexcept try { mFunction(); mPromise.set_value(); } catch(...) { mPromise.set_exception(std::current_exception()); }
private:
TFunctionObject mFunction;
mutable std::promise<void> mPromise;
};
// implementation of TaskInterface without promise/future
template<typename TFunctionObject>
struct TaskImplementationSimple : TaskInterface {
explicit TaskImplementationSimple(const TFunctionObject& f) : mFunction(f) {}
void execute() const noexcept try {
mFunction();
} catch(const std::exception& e) {
std::clog << "Exception when executing a threaded task" << '\n' << e.what() << std::endl;
} catch(...) { std::clog << "Uncaught exception when executing a threaded task" << std::endl;
}
private:
TFunctionObject mFunction;
};
// implementation of the addToQueue function
template<typename TReturnType, typename TFunctionObject>
std::future<TReturnType> addToQueueImpl(TFunctionObject&& fn) {
assert(!*mExecutingThreadMustStop); // if mExecutingThreadMustStop is true, that means the destructor has been called
// building the promise and future ; both are associated to the same "asynchronous state"
std::promise<TReturnType> promise;
auto future = promise.get_future();
// building a TaskImplementation object and adding it to the queue
// our "promise" variable is std::move'd
{ std::lock_guard<std::mutex> lock(*mQueueAccessMutex);
typedef TaskImplementation<typename std::decay<TFunctionObject>::type,TReturnType>
Task;
mQueue->push_back(std::make_unique<Task>(std::move(promise), std::move(fn)));
// if our thread is waiting for the queue to fill, we awake it
mExecutingThreadSemaphore->notify_all();
}
return std::move(future);
}
// implementation of the addToQueueSimple function
template<typename TFunctionObject>
void addToQueueSimpleImpl(TFunctionObject&& fn) {
assert(!*mExecutingThreadMustStop); // if mExecutingThreadMustStop is true, that means the destructor has been called
// building a TaskImplementationSimple object and adding it to the queue
{ std::lock_guard<std::mutex> lock(*mQueueAccessMutex);
typedef TaskImplementationSimple<typename std::decay<TFunctionObject>::type>
Task;
mQueue->push_back(std::make_unique<Task>(std::move(fn)));
// if our thread is waiting for the queue to fill, we awake it
mExecutingThreadSemaphore->notify_all();
}
}
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment