Skip to content

Instantly share code, notes, and snippets.

@nickbailey
Last active October 24, 2018 08:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nickbailey/6fc5a140f950d4ed7d6cbf36287781aa to your computer and use it in GitHub Desktop.
Save nickbailey/6fc5a140f950d4ed7d6cbf36287781aa to your computer and use it in GitHub Desktop.
ThreadPool: Use all your cores
/*
* Test the thread_pool class
*
* Compile with g++ -I.. ../thread_pool.cpp thread_pool-test.cpp -pthread
*/
#include <iostream>
#include <mutex>
#include <exception>
#include "thread_pool.h"
using namespace std;
constexpr size_t threads {0}; // concurrency 0=>platform optimum
constexpr size_t jobs {3}; // number of jobs to do
// Each delegate function will take a C-string and an int.
struct Args {
const char *word;
int num;
};
// We're going to fire off the thread pool four times
// with four different argument sets.
//
// Using void** avoids typecasting later.
// Alternatively, cast to void** in manifold() call.
void* words[][jobs] = {
{ new Args {"One", 1}, new Args {"Two", 2}, new Args {"Three", 3} },
{ new Args {"Four", 1}, new Args {"Five", 2}, new Args {"Six", 3} },
{ new Args {"One", 1}, new Args {"Two", -2}, new Args {"Three", 3} },
{ new Args {"Unus", 1}, new Args {"Duo", 2}, new Args {"Tres", 3} },
};
// The delegate funciton accepts a void*
// which it casts to point at its particular argument type.
// Alternatively, cast to ThreadPool::delegate_t in manifold() call.
void say(void* words) {
Args* w(static_cast<Args*>(words));
int how_many {w->num};
if (how_many < 0)
throw(invalid_argument("Iteration count is negative"));
static mutex m;
lock_guard<mutex> lk(m);
for (int i = 0 ; i < how_many ; i++)
cout << w->word << '\t';
cout << endl;
}
int main()
{
// Requesting 0 threads causes the number to be
// equal to the number of CPU cores available on this platform
ThreadPool p(threads);
cout << p.threads << " worker threads are available on this platform.\n\n";
// Direct invocation
// minifold will return the number of jobs actually run,
// so the following formula ensures maximum concurrency
p.manifold(ThreadPool::delegate_t(say), words[0], jobs);
cout << endl;
// Invocation via lambda expression
// permits use of, e.g., non-static member functions
auto d { [](void* w){say(w);} };
p.manifold(d, words[1], jobs);
cout << endl;
// Exception handling
try {
p.manifold(d, words[2], jobs);
} catch(const std::invalid_argument& ia) {
std::cerr << "Invalid_argument: " << ia.what() << std::endl;
}
cout << endl;
// Foreign language support
// (checks exceptions are cleared properly)
p.manifold(d, words[3], jobs);
cout << endl;
cout << "\nFinished\n";
// I should probably delete the Args in words[][] now...
return 0;
}
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "thread_pool.h"
//#include <iostream>
ThreadPool::ThreadPool(std::size_t numThreads)
: remain(0)
, threads(numThreads ? numThreads : std::thread::hardware_concurrency())
{
states = std::unique_ptr<enum state[]> {
new enum state[threads]
};
exceptions = std::unique_ptr<std::exception_ptr[]> {
new std::exception_ptr[threads]
};
for (std::size_t i {0} ; i < threads ; i++)
states[i] = waiting;
workers = std::unique_ptr<std::thread[]> {
new std::thread[threads]
};
for (std::size_t i {0} ; i < threads ; i++)
workers[i] = std::thread(&ThreadPool::dispatcher, this, i);
}
ThreadPool::~ThreadPool()
{
// Tell all the threads to die
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this]{ return remain==0; });
for (std::size_t i {0} ; i < threads ; i++)
states[i] = dying;
remain = threads;
lk.unlock();
cv.notify_all();
for (std::size_t i {0} ; i < threads ; i++) {
workers[i].join();
}
}
void ThreadPool::wait_raise(std::unique_lock<std::mutex>& lk, const std::size_t to_check)
{
cv.wait(lk, [this]{ return remain==0; });
for (std::size_t i {0} ; i < to_check ; i++)
if (exceptions[i])
std::rethrow_exception(exceptions[i]);
}
void ThreadPool::manifold(delegate_t delegate,
void** params,
std::size_t jobs)
{
this->params = params;
this->delegate = delegate;
remain = 0;
std::size_t to_start {0};
// Clear any old exceptions before reusing the threads
for (std::size_t i {0} ; i < threads ; i++)
exceptions[i] = nullptr;
std::unique_lock<std::mutex> lk(m);
while(remain < jobs) {
wait_raise(lk, to_start);
this->params += to_start;
jobs -= to_start;
to_start = std::min(jobs, this->threads);
for (std::size_t i {0} ; i < to_start ; i++)
states[i] = i < to_start ? running : waiting;
remain = to_start;
lk.unlock();
cv.notify_all();
lk.lock();
}
wait_raise(lk, to_start);
}
void ThreadPool::dispatcher(int id)
{
std::unique_lock<std::mutex> lk(m, std::defer_lock);
while(true) {
lk.lock();
cv.wait(lk, [&id,this]{ return states[id] != waiting; });
// Time to die?
if (states[id] == dying) {
--remain;
states[id] = dead;
lk.unlock();
cv.notify_all();
return;
}
lk.unlock();
// Call delegate, passing parameter block
try {
delegate(params[id]);
} catch(...) {
exceptions[id] = std::current_exception();
}
lk.lock();
states[id] = waiting;
--remain;
lk.unlock();
cv.notify_all();
}
}
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
/*!
* A utility class to permit a single function to be run on multiple
* datasets concurrently.
*
* The class may be instanced giving the number of concurrent threads
* to run, or by requesting 0 threads (the default value), a number
* will be chosen equal to the inherent concurrency of the platform
* on which the application will run.
*
* The caller must prepare an array of parameter blocks ("jobs"). These
* along with the delegate function are passed to the manifold()
* method, and each concurrent invocation of the delegate function
* will be passed a separate parameter block.
*
* The number of jobs must be defined in the manifold() call
* but need not be equal to the number of threads. If the number
* of jobs exceeds the number of threads, work will be performed
* in batches. The size of each batch is the number of threads
* given when the ThreadPool object is created. manifold()
* does not return until all jobs are complete.
*
* Threads are reused across calls to manifold(). When the ThreadPool
* is destroyed, it closes down all threads and awaits their
* proper termination.
*/
class ThreadPool {
public:
/*! The type of the delegate function each thread will run */
typedef std::function<void(void*)> delegate_t;
protected:
/*! thread state descriptor */
enum state {waiting, running, dying, dead};
/*! Worker threads*/
std::unique_ptr<std::thread[]> workers;
/*! State variables for each thread */
std::unique_ptr<enum state[]> states;
/*! exception_ptrs for each thread */
std::unique_ptr<std::exception_ptr[]> exceptions;
/*! Parameters for each thread, passed to the runJobs function */
void** params;
/*! The work the workers are supposed to perform */
delegate_t delegate;
/*! Mutex to control access to job packets */
std::mutex m;
/*!
* Condition variable through which to notify workers
* that jobs are available
*/
std::condition_variable cv;
/*!
* Count of the number of workers still to complete
* their job packets
*/
std::size_t remain;
/*!
* Convenience function: waits for a lock then deals with
* any exceptions proagated from the threads
*
* \param lk The lock on which to wait
* \param to_check Number of threads running on this lock
*/
void wait_raise(std::unique_lock<std::mutex>& lk, const std::size_t to_check);
/*!
* Dispatch jobs to the given delegate
* \param id index into the params/states arrays
* to be used by this thread
*/
void dispatcher(int id);
public:
/*! Number of threads in pool */
const std::size_t threads;
/*! Construct a thread pool */
ThreadPool(std::size_t numThreads = 0);
/*!
* Destroy the thread pool.
* Waits for all executive threads to terminate.
*/
~ThreadPool();
/*!
* Execute a function in each of the threads,
* passing each one its own paramater set.
* The number of jobs (== the number of paramater sets)
* is passed in the final argument. If this exceeds
* the number of threads in the pool, the jobs will be
* sequenced into batches. mainfold() will only return
* when all have been completed.
* \param delegate The function each thread should call.
* \param params An array of pointers to parameters to pass.
* \param jobs Number of threads to run.
*/
void manifold(delegate_t delegate,
void** params,
std::size_t jobs);
};
#endif
@nickbailey
Copy link
Author

nickbailey commented Nov 18, 2017

Got an embarrassingly parallel problem and a multi-core CPU? Want to call a function on different parts of the data on different CPU cores? Here's what this class can do for you.

  • Create a persistent thread pool, defaulting to the number of CPU cores
  • Apply a function using the manifold method with different parameters on each thread

Features:

  • The threads are reused across each call to manifold
  • No special library requirements - just build with C++-11 with -pthread
  • Different delegate functions can be supplied each time manifold is called
  • Uses std::functions (so you can capture this, use lambda expressions, etc.) or just plain old static functions as delegates

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment