Skip to content

Instantly share code, notes, and snippets.

@dannvix
Created March 21, 2016 14:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannvix/f194d9927f69e5572654 to your computer and use it in GitHub Desktop.
Save dannvix/f194d9927f69e5572654 to your computer and use it in GitHub Desktop.
Thread pool using std::thread and boost::lockfree::queue
#include <boost\lockfree\queue.hpp>
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <memory>
#include <condition_variable>
#include <functional>
template<typename ItemType, unsigned long ulQueueCapacity>
class ThreadPool {
public:
ThreadPool(
unsigned long const ulThreadCount,
std::function<void(ItemType item)> pfItemCallbackRoutine)
: m_state(State::Genesis)
, m_ulThreadCount(ulThreadCount)
, m_pfItemCallbackRoutine(pfItemCallbackRoutine)
{
}
~ThreadPool()
{
{
State const nState = m_state.load(std::memory_order_relaxed);
bool bNeedToStop = (nState == State::Running);
if (bNeedToStop) {
stop();
}
}
}
// return TRUE if success
bool start()
{
{
State const nState = m_state.load(std::memory_order_relaxed);
bool bCanStart = (nState == State::Genesis);
if (!bCanStart) {
wprintf(L"ThreadPool start fail\n");
return false;
}
}
m_state.store(State::Starting, std::memory_order_relaxed);
try {
wprintf(L"!!! ulThreadCount=%lu\n", m_ulThreadCount);
for (size_t i = 0; i < m_dwThreadCount; ++i) {
std::thread thread = std::thread(std::bind(&ThreadPool::threadRoutine, this));
m_vThreads.push_back(std::move(thread));
}
}
catch(std::system_error const &e) {
wprintf(L"ThreadPool start failed, code=[%lu], what=[%s]\n", e.code(), e.what());
return false;
}
m_state.store(State::Running, std::memory_order_relaxed);
return true;
}
// return TRUE if success
bool stop()
{
{
State const nState = m_state.load(std::memory_order_relaxed);
bool bCanStop = (nState == State::Running);
if (!bCanStop) {
wprintf(L"ThreadPool stop failed");
return false;
}
}
m_state.store(State::Stopping, std::memory_order_relaxed);
m_condVarQueue.notify_all(); // wake all sleeping threads
for (std::thread &thread : m_vThreads) {
thread.join();
}
m_vThreads.clear();
m_queue.consume_all([](ItemType){});
m_state.store(State::Genesis, std::memory_order_relaxed);
return true;
}
// return true if success
bool enqueue(ItemType const &item)
{
if (m_queue.push(item)) {
m_condVarQueue.notify_one();
return true;
}
return false;
}
private: // disable copy and move
ThreadPool(ThreadPool&);
ThreadPool(ThreadPool&&);
ThreadPool& operator=(ThreadPool&);
ThreadPool& operator=(ThreadPool&&);
private: // methods
void threadRoutine()
{
while (1) {
std::unique_lock<std::mutex> lock(m_mutexQueueCondVar);
m_condVarQueue.wait(lock);
State const nState = m_state.load(std::memory_order_relaxed);
bool bContinueRunning = (nState == State::Running);
if (!bContinueRunning) {
return;
}
ItemType item;
if (m_queue.pop(/*out*/ item)) {
if (m_pfItemCallbackRoutine) {
m_pfItemCallbackRoutine(item);
}
}
}
}
private: // attributes
enum class State { Genesis, Starting, Running, Stopping };
std::atomic<State> m_state;
unsigned long m_ulThreadCount;
std::vector<std::thread> m_vThreads;
std::function<void(ItemType item)> m_pfItemCallbackRoutine;
std::condition_variable m_condVarQueue;
std::mutex m_mutexQueueCondVar;
boost::lockfree::queue<ItemType,
boost::lockfree::fixed_sized<true>,
boost::lockfree::capacity<ulQueueCapacity>> m_queue;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment