Last active October 24, 2018 08:32
ThreadPool: Use all your cores
* Test the thread_pool class
* Compile with g++ -I.. ../thread_pool.cpp thread_pool-test.cpp -pthread
#include <iostream>
#include <mutex>
#include <exception>
#include "thread_pool.h"
using namespace std;
constexpr size_t threads {0}; // concurrency 0=>platform optimum
constexpr size_t jobs {3}; // number of jobs to do
// Each delegate function will take a C-string and an int.
struct Args {
const char *word;
int num;
// We're going to fire off the thread pool four times
// with four different argument sets.
// Using void** avoids typecasting later.
// Alternatively, cast to void** in manifold() call.
void* words[][jobs] = {
{ new Args {"One", 1}, new Args {"Two", 2}, new Args {"Three", 3} },
{ new Args {"Four", 1}, new Args {"Five", 2}, new Args {"Six", 3} },
{ new Args {"One", 1}, new Args {"Two", -2}, new Args {"Three", 3} },
{ new Args {"Unus", 1}, new Args {"Duo", 2}, new Args {"Tres", 3} },
// The delegate funciton accepts a void*
// which it casts to point at its particular argument type.
// Alternatively, cast to ThreadPool::delegate_t in manifold() call.
void say(void* words) {
Args* w(static_cast<Args*>(words));
int how_many {w->num};
if (how_many < 0)
throw(invalid_argument("Iteration count is negative"));
static mutex m;
lock_guard<mutex> lk(m);
for (int i = 0 ; i < how_many ; i++)
cout << w->word << '\t';
cout << endl;
int main()
// Requesting 0 threads causes the number to be
// equal to the number of CPU cores available on this platform
ThreadPool p(threads);
cout << p.threads << " worker threads are available on this platform.\n\n";
// Direct invocation
// minifold will return the number of jobs actually run,
// so the following formula ensures maximum concurrency
p.manifold(ThreadPool::delegate_t(say), words[0], jobs);
cout << endl;
// Invocation via lambda expression
// permits use of, e.g., non-static member functions
auto d { [](void* w){say(w);} };
p.manifold(d, words[1], jobs);
cout << endl;
// Exception handling
try {
p.manifold(d, words[2], jobs);
} catch(const std::invalid_argument& ia) {
std::cerr << "Invalid_argument: " << ia.what() << std::endl;
cout << endl;
// Foreign language support
// (checks exceptions are cleared properly)
p.manifold(d, words[3], jobs);
cout << endl;
cout << "\nFinished\n";
// I should probably delete the Args in words[][] now...
return 0;
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include "thread_pool.h"
//#include <iostream>
ThreadPool::ThreadPool(std::size_t numThreads)
: remain(0)
, threads(numThreads ? numThreads : std::thread::hardware_concurrency())
states = std::unique_ptr<enum state[]> {
new enum state[threads]
exceptions = std::unique_ptr<std::exception_ptr[]> {
new std::exception_ptr[threads]
for (std::size_t i {0} ; i < threads ; i++)
states[i] = waiting;
workers = std::unique_ptr<std::thread[]> {
new std::thread[threads]
for (std::size_t i {0} ; i < threads ; i++)
workers[i] = std::thread(&ThreadPool::dispatcher, this, i);
// Tell all the threads to die
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [this]{ return remain==0; });
for (std::size_t i {0} ; i < threads ; i++)
states[i] = dying;
remain = threads;
for (std::size_t i {0} ; i < threads ; i++) {
void ThreadPool::wait_raise(std::unique_lock<std::mutex>& lk, const std::size_t to_check)
cv.wait(lk, [this]{ return remain==0; });
for (std::size_t i {0} ; i < to_check ; i++)
if (exceptions[i])
void ThreadPool::manifold(delegate_t delegate,
void** params,
std::size_t jobs)
this->params = params;
this->delegate = delegate;
remain = 0;
std::size_t to_start {0};
// Clear any old exceptions before reusing the threads
for (std::size_t i {0} ; i < threads ; i++)
exceptions[i] = nullptr;
std::unique_lock<std::mutex> lk(m);
while(remain < jobs) {
wait_raise(lk, to_start);
this->params += to_start;
jobs -= to_start;
to_start = std::min(jobs, this->threads);
for (std::size_t i {0} ; i < to_start ; i++)
states[i] = i < to_start ? running : waiting;
remain = to_start;
wait_raise(lk, to_start);
void ThreadPool::dispatcher(int id)
std::unique_lock<std::mutex> lk(m, std::defer_lock);
while(true) {
cv.wait(lk, [&id,this]{ return states[id] != waiting; });
// Time to die?
if (states[id] == dying) {
states[id] = dead;
// Call delegate, passing parameter block
try {
} catch(...) {
exceptions[id] = std::current_exception();
states[id] = waiting;
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <stdexcept>
* A utility class to permit a single function to be run on multiple
* datasets concurrently.
* The class may be instanced giving the number of concurrent threads
* to run, or by requesting 0 threads (the default value), a number
* will be chosen equal to the inherent concurrency of the platform
* on which the application will run.
* The caller must prepare an array of parameter blocks ("jobs"). These
* along with the delegate function are passed to the manifold()
* method, and each concurrent invocation of the delegate function
* will be passed a separate parameter block.
* The number of jobs must be defined in the manifold() call
* but need not be equal to the number of threads. If the number
* of jobs exceeds the number of threads, work will be performed
* in batches. The size of each batch is the number of threads
* given when the ThreadPool object is created. manifold()
* does not return until all jobs are complete.
* Threads are reused across calls to manifold(). When the ThreadPool
* is destroyed, it closes down all threads and awaits their
* proper termination.
class ThreadPool {
/*! The type of the delegate function each thread will run */
typedef std::function<void(void*)> delegate_t;
/*! thread state descriptor */
enum state {waiting, running, dying, dead};
/*! Worker threads*/
std::unique_ptr<std::thread[]> workers;
/*! State variables for each thread */
std::unique_ptr<enum state[]> states;
/*! exception_ptrs for each thread */
std::unique_ptr<std::exception_ptr[]> exceptions;
/*! Parameters for each thread, passed to the runJobs function */
void** params;
/*! The work the workers are supposed to perform */
delegate_t delegate;
/*! Mutex to control access to job packets */
std::mutex m;
* Condition variable through which to notify workers
* that jobs are available
std::condition_variable cv;
* Count of the number of workers still to complete
* their job packets
std::size_t remain;
* Convenience function: waits for a lock then deals with
* any exceptions proagated from the threads
* \param lk The lock on which to wait
* \param to_check Number of threads running on this lock
void wait_raise(std::unique_lock<std::mutex>& lk, const std::size_t to_check);
* Dispatch jobs to the given delegate
* \param id index into the params/states arrays
* to be used by this thread
void dispatcher(int id);
/*! Number of threads in pool */
const std::size_t threads;
/*! Construct a thread pool */
ThreadPool(std::size_t numThreads = 0);
* Destroy the thread pool.
* Waits for all executive threads to terminate.
* Execute a function in each of the threads,
* passing each one its own paramater set.
* The number of jobs (== the number of paramater sets)
* is passed in the final argument. If this exceeds
* the number of threads in the pool, the jobs will be
* sequenced into batches. mainfold() will only return
* when all have been completed.
* \param delegate The function each thread should call.
* \param params An array of pointers to parameters to pass.
* \param jobs Number of threads to run.
void manifold(delegate_t delegate,
void** params,
std::size_t jobs);
nickbailey commented Nov 18, 2017

Got an embarrassingly parallel problem and a multi-core CPU? Want to call a function on different parts of the data on different CPU cores? Here's what this class can do for you.

  • Create a persistent thread pool, defaulting to the number of CPU cores
  • Apply a function using the manifold method with different parameters on each thread


  • The threads are reused across each call to manifold
  • No special library requirements - just build with C++-11 with -pthread
  • Different delegate functions can be supplied each time manifold is called
  • Uses std::functions (so you can capture this, use lambda expressions, etc.) or just plain old static functions as delegates

