Created
March 31, 2018 00:37
-
-
Save hamsham/3f03d7074324c63daaf7f4f913bfbd98 to your computer and use it in GitHub Desktop.
Simple worker thread/task queue test.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Simple worker thread/task queue test. | |
* | |
* g++ -std=c++11 -Wall Werror -Wextra -pedantic-errors work_thread.cpp -lpthread -o work_thread | |
*/ | |
#include <atomic> | |
#include <cassert> | |
#include <chrono> // std::seconds | |
#include <condition_variable> | |
#include <cstddef> // ptrdiff_t | |
#include <iostream> | |
#include <memory> | |
#include <mutex> | |
#include <thread> | |
#include <vector> | |
struct WorkerTask | |
{ | |
virtual ~WorkerTask() {} | |
WorkerTask() noexcept {} | |
WorkerTask(const WorkerTask&) = delete; | |
WorkerTask(WorkerTask&&) = delete; | |
WorkerTask& operator=(const WorkerTask&) = delete; | |
WorkerTask& operator=(WorkerTask&&) = delete; | |
virtual void execute() noexcept = 0; | |
}; | |
class WorkerThread | |
{ | |
private: | |
std::atomic_bool mIsPaused; | |
int mCurrentBuffer; | |
std::vector<WorkerTask*> mInputs[2]; | |
std::vector<WorkerTask*> mOutputs[2]; | |
std::mutex mMutex; | |
std::condition_variable mCondition; | |
std::thread mThread; | |
public: | |
~WorkerThread() noexcept | |
{ | |
mCurrentBuffer = -1; | |
mIsPaused.store(false, std::memory_order_relaxed); | |
mCondition.notify_one(); | |
mThread.join(); | |
} | |
WorkerThread() noexcept : | |
mIsPaused{true}, | |
mCurrentBuffer{0}, // must be greater than or equal to 0 for *this to run. | |
mInputs{}, | |
mOutputs{}, | |
mMutex{}, | |
mCondition{}, | |
mThread{} | |
{ | |
const auto threadLoop = [&]()->void | |
{ | |
while (true) | |
{ | |
// Lock the condition, wait until we're ready to execute. | |
// A while loop should also protect against spurious wake-ups | |
// by the condition variable. | |
while (mIsPaused) | |
{ | |
std::unique_lock<std::mutex> cvLock{mMutex}; | |
mCondition.wait(cvLock); | |
} | |
// Exit the thread if mCurrentBuffer is less than 0. | |
if (mCurrentBuffer < 0) | |
{ | |
break; | |
} | |
// The current I/O buffers were swapped when "flush()" was | |
// called. Swap again to read and write to the buffers used on | |
// this thread. | |
int currentBuffer; | |
std::vector<WorkerTask*>* inputQueue; | |
std::vector<WorkerTask*>* outputQueue; | |
// Lock access to the current buffers only when swapping. | |
{ | |
std::unique_lock<std::mutex> dataLock{mMutex}; | |
currentBuffer = (mCurrentBuffer + 1) % 2; | |
inputQueue = mInputs + currentBuffer; | |
outputQueue = mOutputs + currentBuffer; | |
} | |
std::vector<WorkerTask*>::size_type inSize = inputQueue->size(); | |
for (WorkerTask* pTask : *inputQueue) | |
{ | |
pTask->execute(); | |
outputQueue->push_back(pTask); | |
} | |
inputQueue->clear(); | |
std::vector<WorkerTask*>::size_type outSize = outputQueue->size(); | |
assert(inSize == outSize); | |
// Pause the current thread again. | |
mIsPaused.store(true, std::memory_order_relaxed); | |
} | |
}; | |
mThread = std::move(std::thread{threadLoop}); | |
} | |
WorkerThread(const WorkerThread&) = delete; | |
WorkerThread(WorkerThread&&) = delete; | |
WorkerThread& operator=(const WorkerThread&) = delete; | |
WorkerThread& operator=(WorkerThread&&) = delete; | |
inline void push(WorkerTask* task) | |
{ | |
mInputs[mCurrentBuffer].push_back(task); | |
} | |
void flush() | |
{ | |
const int currentBuffer = mCurrentBuffer; | |
const int nextBuffer = (mCurrentBuffer + 1) % 2; | |
if (mIsPaused.load(std::memory_order_relaxed)) | |
{ | |
std::unique_lock<std::mutex> lock{mMutex}; | |
mCurrentBuffer = nextBuffer; | |
mIsPaused.store(mInputs[currentBuffer].empty(), std::memory_order_relaxed); | |
mCondition.notify_one(); | |
} | |
} | |
inline bool ready() const | |
{ | |
return mIsPaused.load(std::memory_order_relaxed); | |
} | |
inline std::vector<WorkerTask*>& results() noexcept | |
{ | |
return mOutputs[mCurrentBuffer]; | |
} | |
}; | |
struct SampleTask final : public WorkerTask | |
{ | |
volatile int x = rand() % 2 + 1; | |
virtual void execute() noexcept override | |
{ | |
std::this_thread::sleep_for(std::chrono::seconds(x)); | |
} | |
}; | |
int main() | |
{ | |
srand(time(nullptr)); | |
WorkerThread thread{}; | |
std::vector<WorkerTask*> results; | |
std::vector<WorkerTask*>::size_type numExtraResults = 0; | |
thread.push(new SampleTask); | |
thread.push(new SampleTask); | |
thread.push(new SampleTask); | |
thread.push(new SampleTask); | |
thread.flush(); | |
while (!thread.ready()) | |
{ | |
std::cout << "Waiting for the worker thread to finish..." << std::endl; | |
std::this_thread::sleep_for(std::chrono::milliseconds{1000}); | |
// push tasks while waiting for the test to complete | |
thread.push(new SampleTask); | |
++numExtraResults; | |
} | |
thread.flush(); | |
results = std::move(thread.results()); | |
assert(results.size() == 4); | |
std::cout << "Received " << results.size() << " result tasks." << std::endl; | |
std::cout << "Thread ready state: " << thread.ready() << std::endl; | |
for (WorkerTask* pTask : results) | |
{ | |
delete pTask; | |
} | |
while (!thread.ready()) | |
{ | |
std::cout << "Waiting " << numExtraResults << " more tasks to finish..." << std::endl; | |
std::this_thread::sleep_for(std::chrono::seconds{1}); | |
} | |
thread.flush(); | |
results = std::move(thread.results()); | |
assert(results.size() == numExtraResults); | |
std::cout << "Received " << results.size() << " more results." << std::endl; | |
std::cout << "Thread ready state: " << thread.ready() << std::endl; | |
for (WorkerTask* pTask : results) | |
{ | |
delete pTask; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment