Skip to content

Instantly share code, notes, and snippets.

@hamsham
Created March 31, 2018 00:37
Show Gist options
  • Save hamsham/3f03d7074324c63daaf7f4f913bfbd98 to your computer and use it in GitHub Desktop.
Save hamsham/3f03d7074324c63daaf7f4f913bfbd98 to your computer and use it in GitHub Desktop.
Simple worker thread/task queue test.
/*
* Simple worker thread/task queue test.
*
* g++ -std=c++11 -Wall Werror -Wextra -pedantic-errors work_thread.cpp -lpthread -o work_thread
*/
#include <atomic>
#include <cassert>
#include <chrono> // std::seconds
#include <condition_variable>
#include <cstddef> // ptrdiff_t
#include <iostream>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
struct WorkerTask
{
virtual ~WorkerTask() {}
WorkerTask() noexcept {}
WorkerTask(const WorkerTask&) = delete;
WorkerTask(WorkerTask&&) = delete;
WorkerTask& operator=(const WorkerTask&) = delete;
WorkerTask& operator=(WorkerTask&&) = delete;
virtual void execute() noexcept = 0;
};
class WorkerThread
{
private:
std::atomic_bool mIsPaused;
int mCurrentBuffer;
std::vector<WorkerTask*> mInputs[2];
std::vector<WorkerTask*> mOutputs[2];
std::mutex mMutex;
std::condition_variable mCondition;
std::thread mThread;
public:
~WorkerThread() noexcept
{
mCurrentBuffer = -1;
mIsPaused.store(false, std::memory_order_relaxed);
mCondition.notify_one();
mThread.join();
}
WorkerThread() noexcept :
mIsPaused{true},
mCurrentBuffer{0}, // must be greater than or equal to 0 for *this to run.
mInputs{},
mOutputs{},
mMutex{},
mCondition{},
mThread{}
{
const auto threadLoop = [&]()->void
{
while (true)
{
// Lock the condition, wait until we're ready to execute.
// A while loop should also protect against spurious wake-ups
// by the condition variable.
while (mIsPaused)
{
std::unique_lock<std::mutex> cvLock{mMutex};
mCondition.wait(cvLock);
}
// Exit the thread if mCurrentBuffer is less than 0.
if (mCurrentBuffer < 0)
{
break;
}
// The current I/O buffers were swapped when "flush()" was
// called. Swap again to read and write to the buffers used on
// this thread.
int currentBuffer;
std::vector<WorkerTask*>* inputQueue;
std::vector<WorkerTask*>* outputQueue;
// Lock access to the current buffers only when swapping.
{
std::unique_lock<std::mutex> dataLock{mMutex};
currentBuffer = (mCurrentBuffer + 1) % 2;
inputQueue = mInputs + currentBuffer;
outputQueue = mOutputs + currentBuffer;
}
std::vector<WorkerTask*>::size_type inSize = inputQueue->size();
for (WorkerTask* pTask : *inputQueue)
{
pTask->execute();
outputQueue->push_back(pTask);
}
inputQueue->clear();
std::vector<WorkerTask*>::size_type outSize = outputQueue->size();
assert(inSize == outSize);
// Pause the current thread again.
mIsPaused.store(true, std::memory_order_relaxed);
}
};
mThread = std::move(std::thread{threadLoop});
}
WorkerThread(const WorkerThread&) = delete;
WorkerThread(WorkerThread&&) = delete;
WorkerThread& operator=(const WorkerThread&) = delete;
WorkerThread& operator=(WorkerThread&&) = delete;
inline void push(WorkerTask* task)
{
mInputs[mCurrentBuffer].push_back(task);
}
void flush()
{
const int currentBuffer = mCurrentBuffer;
const int nextBuffer = (mCurrentBuffer + 1) % 2;
if (mIsPaused.load(std::memory_order_relaxed))
{
std::unique_lock<std::mutex> lock{mMutex};
mCurrentBuffer = nextBuffer;
mIsPaused.store(mInputs[currentBuffer].empty(), std::memory_order_relaxed);
mCondition.notify_one();
}
}
inline bool ready() const
{
return mIsPaused.load(std::memory_order_relaxed);
}
inline std::vector<WorkerTask*>& results() noexcept
{
return mOutputs[mCurrentBuffer];
}
};
struct SampleTask final : public WorkerTask
{
volatile int x = rand() % 2 + 1;
virtual void execute() noexcept override
{
std::this_thread::sleep_for(std::chrono::seconds(x));
}
};
int main()
{
srand(time(nullptr));
WorkerThread thread{};
std::vector<WorkerTask*> results;
std::vector<WorkerTask*>::size_type numExtraResults = 0;
thread.push(new SampleTask);
thread.push(new SampleTask);
thread.push(new SampleTask);
thread.push(new SampleTask);
thread.flush();
while (!thread.ready())
{
std::cout << "Waiting for the worker thread to finish..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
// push tasks while waiting for the test to complete
thread.push(new SampleTask);
++numExtraResults;
}
thread.flush();
results = std::move(thread.results());
assert(results.size() == 4);
std::cout << "Received " << results.size() << " result tasks." << std::endl;
std::cout << "Thread ready state: " << thread.ready() << std::endl;
for (WorkerTask* pTask : results)
{
delete pTask;
}
while (!thread.ready())
{
std::cout << "Waiting " << numExtraResults << " more tasks to finish..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds{1});
}
thread.flush();
results = std::move(thread.results());
assert(results.size() == numExtraResults);
std::cout << "Received " << results.size() << " more results." << std::endl;
std::cout << "Thread ready state: " << thread.ready() << std::endl;
for (WorkerTask* pTask : results)
{
delete pTask;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment