/worker.cpp Secret
Created
August 23, 2014 23:13
work-sharing executor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
template<typename T> | |
class worker { | |
public: | |
enum states {WAITING, RUNNABLE, STEALING, TERMINATED}; | |
worker(std::vector<T>& queues): worker(queues, RUNNABLE) {} | |
worker(std::vector<T>& queues, states state): tid(n++), state(state), queues(queues) { | |
thread = std::unique_ptr<std::thread>(new std::thread([this] { this->run(); })); | |
} | |
void activate() { | |
state = RUNNABLE; | |
waiters.notify_one(); | |
} | |
void terminate() { | |
if (state != TERMINATED) { | |
state = TERMINATED; | |
waiters.notify_one(); | |
thread->join(); | |
} | |
} | |
void join() { | |
if(thread) { | |
thread->join(); | |
} | |
} | |
~worker() { | |
terminate(); | |
} | |
private: | |
void run() { | |
try { | |
while (state != TERMINATED) { | |
switch (state) { | |
case WAITING: { | |
std::unique_lock<std::mutex> lock(mutex); | |
waiters.wait(lock, [&](){return state != WAITING;}); | |
} | |
break; | |
case RUNNABLE: | |
try { | |
auto next = queues[tid].dequeue()->operator()(); | |
if (next) { | |
queues[tid].enqueue(next); | |
} | |
} | |
catch (que::queue_empty_exception& e) { | |
state = STEALING; | |
} | |
catch (que::queue_closed_exception& e) { | |
state = TERMINATED; | |
} | |
break; | |
case STEALING: | |
state = steal(); | |
break; | |
case TERMINATED: | |
break; | |
} | |
} | |
} | |
catch (std::exception& e) { | |
std::cerr << e.what() << std::endl << __FILE__ << doh::GOODBYE <<std::endl; | |
} | |
} | |
states steal() { //this should be one of any number of stealing policy types at some later date | |
for(int seek = 0; seek < queues.size(); ++seek) { | |
try { | |
if (!queues[seek].empty() & (seek != tid)) { | |
queues[tid].enqueue(queues[seek].dequeue()); | |
return RUNNABLE; | |
} | |
} | |
catch(que::queue_empty_exception& e) { | |
return STEALING; | |
} | |
} | |
return TERMINATED; | |
}; | |
worker(const worker&) = delete; | |
void operator= (const worker&) = delete; | |
static unsigned int n; | |
unsigned int tid; | |
states state; | |
std::vector<T>& queues; | |
std::unique_ptr<std::thread>thread; | |
std::mutex mutex; | |
std::condition_variable waiters; | |
}; | |
template <typename T> | |
unsigned int worker<T>::n = 0; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment