Skip to content

Instantly share code, notes, and snippets.

@zxytim
Created January 25, 2014 04:10
Show Gist options
  • Save zxytim/8611754 to your computer and use it in GitHub Desktop.
Save zxytim/8611754 to your computer and use it in GitHub Desktop.
/*
* $File: class_uniq_thread_job_pool.hh
* $Date: Sat Jan 18 14:33:53 2014 +0800
* $Author: Xinyu Zhou <zxytim[at]gmail[dot]com>
*/
#pragma once
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>
#include <unordered_map>
#include <map>
#include <set>
#include "multi_put_single_get_job_queue.hh"
/*!
* A thread pool which support tasks with classes(a concept, not c++ class),
* which contrained to:
* A single task of a class can be excuted at a time,
* that is, a task belongs to a class is exclusive to others
* tasks belong to the same class.
*
* In order not to introduce global lock when adding jobs, job assignment is
* done by using hash value of class_types to assign a job. Therefore, it is
* required that the class_type must be hashable,
*
* Caveat:
* 1.while this thread pool can handle enormous number of class_types,
* its performance is not guaranteed when number of class_types is
* small: load of workers may be unbalanced.
*
*/
template<class class_type, class hash_func = std::hash<class_type>>
class class_uniq_thread_job_pool {
public:
class_uniq_thread_job_pool(size_t nr_thread, const hash_func &hasher = hash_func());
template<class F, class... Args>
auto put(const class_type &ctype, F&& f, Args&&...args)
->std::future<typename std::result_of<F(Args...)>::type>;
~class_uniq_thread_job_pool();
protected:
class pool_worker {
public:
pool_worker() :
stop(false) {
}
void set_stop();
void run();
void operator () () { run(); }
//! this queue is thread safe while phutting in things
multi_put_single_get_job_queue<std::function<void()>> job_queue;
std::condition_variable condition;
std::mutex notification_mutex;
bool stop;
};
std::vector<std::thread> worker_threads;
std::vector<std::shared_ptr<pool_worker>> workers;
hash_func hasher;
void set_stop();
};
template<class class_type, class hash_func>
class_uniq_thread_job_pool<class_type, hash_func>::class_uniq_thread_job_pool(
size_t nr_thread, const hash_func &hasher) : hasher(hasher) {
for (size_t i = 0; i < nr_thread; i ++)
{
workers.emplace_back();
auto &worker = workers.back();
worker = std::make_shared<pool_worker>();
worker_threads.emplace_back(
std::bind(&pool_worker::run, worker.get()));
}
}
template<class class_type, class hash_func>
template<class F, class... Args>
auto class_uniq_thread_job_pool<class_type, hash_func>::put(const class_type &class_id, F&& f, Args&&...args)
->std::future<typename std::result_of<F(Args...)>::type>
{
typedef typename std::result_of<F(Args...)>::type return_type;
// if (stop)
// throw std::runtime_error("put on stopped class_uniq_thread_job_pool");
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> ret = task->get_future();
int assigned_worker_id = hasher(class_id) % workers.size();
auto &assigned_worker = *workers[assigned_worker_id];
assigned_worker.job_queue.put(
[task](){ (*task)(); }); // thread safe when putting in
assigned_worker.condition.notify_one();
return ret;
}
template<class class_type, class hash_func>
void class_uniq_thread_job_pool<class_type, hash_func>::set_stop() {
for (auto &worker: workers)
worker->set_stop();
}
template<class class_type, class hash_func>
class_uniq_thread_job_pool<class_type, hash_func>::~class_uniq_thread_job_pool() {
set_stop();
for (auto &th: worker_threads)
th.join();
}
template<class class_type, class hash_func>
void class_uniq_thread_job_pool<class_type, hash_func>::pool_worker::run() {
try {
for (; ;) {
std::function<void()> job(job_queue.get()); // get is lock free,
job_queue.pop();
job();
}
} catch (JobFinished) {
}
}
template<class class_type, class hash_func>
void class_uniq_thread_job_pool<class_type, hash_func>::pool_worker::set_stop() {
job_queue.stop();
}
// vim: syntax=cpp11.doxygen foldmethod=marker
/*
* $File: multi_put_single_get_job_queue.hh
* $Date: Sat Jan 18 14:22:22 2014 +0800
* $Author: Xinyu Zhou <zxytim[at]gmail[dot]com>
*/
#pragma once
#include <cstddef>
#include <vector>
#include <memory>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <iostream>
class JobFinished{};
/*!
* A multi-put-single-get model job queue.
*
* This queue is thread safe when multiple threads tries to put things in,
* however, it is not thread safe when multiple threads tries to get things.
* This compromise could leads to better performance which introduces
* fewer locks while a single thread get things.
*/
template<class value_type>
class multi_put_single_get_job_queue {
public:
multi_put_single_get_job_queue() : m_stop(false) {
get_queue = new std::queue<value_type>();
buf_queue = new std::queue<value_type>();
}
~multi_put_single_get_job_queue() {
delete get_queue;
delete buf_queue;
}
void put(const value_type &val) {
{
// if (m_stop)
// throw JobFinished();
std::unique_lock<std::mutex>
lock(buf_queue_mutex);
buf_queue->push(val);
}
condition.notify_one();
}
// block wait
value_type &get() throw (JobFinished) {
try_enrich_get_queue();
return get_queue->front();
}
void pop() {
try_enrich_get_queue();
get_queue->pop();
}
void stop() {
{
std::unique_lock<std::mutex>
lock(buf_queue_mutex);
m_stop = true;
}
condition.notify_one();
}
bool is_stopped() {
return m_stop;
}
protected:
std::queue<value_type> *get_queue, *buf_queue;
std::mutex buf_queue_mutex;
std::condition_variable condition;
bool m_stop;
void try_enrich_get_queue() throw(JobFinished){
if (!get_queue->empty())
return;
{
std::unique_lock<std::mutex> lock(buf_queue_mutex);
while (!m_stop && buf_queue->empty()) {
condition.wait(lock);
}
if (m_stop && buf_queue->empty())
throw JobFinished();
std::swap(get_queue, buf_queue);
}
}
};
// vim: syntax=cpp11.doxygen foldmethod=marker
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment