Skip to content

Instantly share code, notes, and snippets.

@djg
Created March 3, 2020 03:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djg/975cebaa3739e1ffb6924c7dce7bb8f9 to your computer and use it in GitHub Desktop.
Save djg/975cebaa3739e1ffb6924c7dce7bb8f9 to your computer and use it in GitHub Desktop.
Sean Parent - OKish Task System with Task Stealing in C++14
/* Horrible task system - task stealing */
#include <atomic>
#include <condition_variable>
#include <deque>
#include <functional>
#include <mutex>
#include <thread>
#include <vector>
using namespace std;
using lock_t = unique_lock<mutex>;
class notification_queue {
deque<function<void()>> _q;
mutex _mutex;
condition_variable _ready;
bool _done{false};
public:
void done() {
{
unique_lock<mutex> lock{_mutex};
_done = true;
}
}
bool pop(function<void()>& x) {
lock_t lock{_mutex};
while (_q.empty() && !_done) _ready.wait(lock);
if (_q.empty()) return false;
x = move(_q.front());
_q.pop_front();
return true;
}
template<typename F>
void push(F&& f) {
{
lock_t lock{_mutex};
_q.emplace_back(forward<F>(f));
}
_ready.notify_one();
}
bool try_pop(function<void()>& x) {
lock_t lock{_mutex, try_to_lock};
if (!lock || _q.empty()) return false;
x = move(_q.front());
_q.pop_front();
return true;
}
template<typename F>
bool try_push(F&& f) {
{
lock_t lock{_mutex, try_to_lock};
if (!lock) return false;
_q.emplace_back(forward<F>(f));
}
_ready.notify_one();
return true;
}
};
class task_system {
const unsigned K = 10;
const unsigned _count{thread::hardware_concurrency()};
vector<thread> _threads;
vector<notification_queue> _q{_count};
atomic<unsigned> _index{0};
void run(unsigned i) {
while (true) {
function<void()> f;
for (unsigned n = 0; n != _count; ++n) {
if (_q[(i + n) % _count].try_pop(f)) break;
}
if (!f && !_q[i].pop(f)) break;
f();
}
}
public:
task_system() {
for (unsigned n = 0; n != _count; ++n) {
_threads.emplace_back([&, n]{ run(n); });
}
}
~task_system() {
for (auto& e : _q) e.done();
for (auto& e : _threads) e.join();
}
template <typename F>
void async_(F&& f) {
auto i = _index++;
for (unsigned n = 0; n != _count * K; ++n) {
if (_q[(i + n) % _count].try_push(forward<F>(f))) return;
}
_q[i % _count].push(forward<F>(f));
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment