Skip to content

Instantly share code, notes, and snippets.

@ifknot
Created August 23, 2014 23:13
work-sharing executor
template<typename T>
class worker {
public:
enum states {WAITING, RUNNABLE, STEALING, TERMINATED};
worker(std::vector<T>& queues): worker(queues, RUNNABLE) {}
worker(std::vector<T>& queues, states state): tid(n++), state(state), queues(queues) {
thread = std::unique_ptr<std::thread>(new std::thread([this] { this->run(); }));
}
void activate() {
state = RUNNABLE;
waiters.notify_one();
}
void terminate() {
if (state != TERMINATED) {
state = TERMINATED;
waiters.notify_one();
thread->join();
}
}
void join() {
if(thread) {
thread->join();
}
}
~worker() {
terminate();
}
private:
void run() {
try {
while (state != TERMINATED) {
switch (state) {
case WAITING: {
std::unique_lock<std::mutex> lock(mutex);
waiters.wait(lock, [&](){return state != WAITING;});
}
break;
case RUNNABLE:
try {
auto next = queues[tid].dequeue()->operator()();
if (next) {
queues[tid].enqueue(next);
}
}
catch (que::queue_empty_exception& e) {
state = STEALING;
}
catch (que::queue_closed_exception& e) {
state = TERMINATED;
}
break;
case STEALING:
state = steal();
break;
case TERMINATED:
break;
}
}
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl << __FILE__ << doh::GOODBYE <<std::endl;
}
}
states steal() { //this should be one of any number of stealing policy types at some later date
for(int seek = 0; seek < queues.size(); ++seek) {
try {
if (!queues[seek].empty() & (seek != tid)) {
queues[tid].enqueue(queues[seek].dequeue());
return RUNNABLE;
}
}
catch(que::queue_empty_exception& e) {
return STEALING;
}
}
return TERMINATED;
};
worker(const worker&) = delete;
void operator= (const worker&) = delete;
static unsigned int n;
unsigned int tid;
states state;
std::vector<T>& queues;
std::unique_ptr<std::thread>thread;
std::mutex mutex;
std::condition_variable waiters;
};
template <typename T>
unsigned int worker<T>::n = 0;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment