Skip to content

Instantly share code, notes, and snippets.

@Klowner
Last active September 29, 2017 02:53
Show Gist options
  • Save Klowner/09e0018248ca333ad2ff1b446812643e to your computer and use it in GitHub Desktop.
Save Klowner/09e0018248ca333ad2ff1b446812643e to your computer and use it in GitHub Desktop.
Lock-free C++ single producer job scheduler thing that probably is secretly broken
#include <mutex>
#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
class Scheduler
{
public:
Scheduler()
: m_running(true)
{}
void registerTask(void (*_fn)(uint64_t)) noexcept
{
m_tasks.push_back(_fn);
}
bool push(uint64_t _t) noexcept
{
FrameJob* framejob;
unsigned int id;
if (m_jobQueue.alloc(&framejob, id)) {
// The job queue may hand a framejob back which is complete but
// also has a sleeping thread holding a reference to it. In that
// rare situation (or with thread-sanitizer), we spin for a moment
// until that thread gets the heck off this framejob.
while (framejob->readers.load() > 0)
{
// waiting on workers to finish.
}
unsigned int i = 0;
for (auto& task : m_tasks) {
framejob->jobs[i].fn = task;
framejob->jobs[i].state.store(STATE_READY);
++i;
}
framejob->numJobs = i;
framejob->t = _t;
framejob->pending.store(i);
m_jobQueue.commit();
return true;
}
return false;
}
void exec() noexcept
{
FrameJob* framejob = nullptr;
if (m_jobQueue.acquire(&framejob)) {
auto pending = framejob->pending.load(std::memory_order_acquire);
if (pending > 0) {
auto& job = framejob->getNextJob();
//std::cout << "working on " << &job << '\n';
switch (auto expect = job.state.load()) {
case STATE_READY:
{
if (std::atomic_compare_exchange_weak(&job.state, &expect, static_cast<uint_fast8_t>(STATE_BUSY))) {
job.fn(framejob->t);
job.state.store(STATE_FINISHED);
if (framejob->pending.fetch_sub(1u) == 1u) {
m_jobQueue.complete(framejob->index);
};
}
} break;
default: break;
}
}
m_jobQueue.release(framejob);
}
}
bool running() const noexcept {
return m_running;
}
void shutdown() noexcept {
m_running.store(false);
}
bool busy() const noexcept {
return m_jobQueue.m_read_complete.load() != m_jobQueue.m_write.load();
}
protected:
struct FrameJob {
FrameJob()
: pending(0)
, readers(0)
{}
struct Job {
void (*fn)(uint64_t _t);
std::atomic<uint_fast8_t> state;
};
Job& getNextJob() noexcept {
return jobs[jobIndex.fetch_add(1u) % numJobs];
}
int addRef() noexcept { return readers.fetch_add(1u); }
int delRef() noexcept { return readers.fetch_sub(1u); }
Job jobs[32];
uint64_t t;
unsigned int index;
int numJobs;
std::atomic<int> pending;
std::atomic<int> readers;
std::atomic<unsigned int> jobIndex;
};
enum States {
STATE_UNUSED,
STATE_READY,
STATE_BUSY,
STATE_FINISHED,
};
struct FrameJobQueue {
static constexpr const unsigned int NUM_BUFFERS = 2;
FrameJobQueue()
: m_read_complete(0)
, m_read_max(0)
, m_read(0)
, m_write(0)
{
for (unsigned int i = 0; i < NUM_BUFFERS; ++i) {
m_buffers[i].index = i;
}
}
unsigned int index(unsigned int _i) noexcept {
return _i % NUM_BUFFERS;
}
// Allocate space for a single item
bool alloc(FrameJob** _dst, unsigned int& _commit) noexcept {
auto current_write = m_write.load();
auto current_read = m_read.load();
do {
if (index(current_write + 1u) == index(current_read)) {
*_dst = nullptr;
_commit = (unsigned int)-1;
return false;
}
} while (!std::atomic_compare_exchange_weak(&m_write, &current_write, (current_write + 1)));
*_dst = &m_buffers[index(current_write)];
(*_dst)->index = current_write + 1;
_commit = current_write;
return true;
}
inline void commit() noexcept {
//(void)_new_read_max;
////m_read_max.store(_new_read_max);
m_read_max.fetch_add(1u, std::memory_order_relaxed);
}
void complete(const unsigned int _index) noexcept {
auto expect = _index;
while (!std::atomic_compare_exchange_weak(&m_read_complete, &expect, _index))
{}
}
bool acquire(FrameJob** _dst) noexcept {
auto current_read = m_read.load();
auto current_read_max = m_read_max.load();
auto current_read_complete = m_read_complete.load();
// If there's nothing to advance to, bail out.
if (index(current_read) == index(current_read_max)) {
return false;
}
// active job is current
if (index(current_read_complete) == index(current_read)) {
*_dst = &m_buffers[index(current_read)];
(*_dst)->addRef();
return true;
} else {
std::atomic_compare_exchange_weak(&m_read, &current_read, current_read_complete);
return false;
}
}
unsigned int release(FrameJob* _job) noexcept {
return _job->delRef();
}
FrameJob m_buffers[NUM_BUFFERS];
std::atomic<unsigned int> m_read_complete;
std::atomic<unsigned int> m_read_max;
std::atomic<unsigned int> m_read;
std::atomic<unsigned int> m_write;
};
std::vector<void (*)(uint64_t)> m_tasks;
std::atomic<bool> m_running;
FrameJobQueue m_jobQueue;
};
void task_a(uint64_t _t) noexcept
{
std::cout << "task a, tick " << _t << std::endl;
}
void task_b(uint64_t _t) noexcept
{
std::cout << "task b, tick " << _t << std::endl;
}
void task_c(uint64_t _t) noexcept
{
std::cout << "task c, tick " << _t << std::endl;
}
int main(int argc, char** argv)
{
(void)argc;
(void)argv;
Scheduler scheduler;
scheduler.registerTask(task_a);
scheduler.registerTask(task_b);
scheduler.registerTask(task_c);
std::vector<std::thread> threads;
for (unsigned int tid = 0; tid < 2; ++tid) {
threads.emplace_back([&]() {
while (scheduler.running()) {
scheduler.exec();
}
});
}
uint64_t tick = 0;
while (tick < 10000) {
if (scheduler.push(tick)) {
tick++;
}
}
while (scheduler.busy())
{
// spin! boo boo, spin!
}
scheduler.shutdown();
std::cout << "shutting down" << '\n';
for (auto& t : threads) t.join();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment