Skip to content

Instantly share code, notes, and snippets.

@tobyp
Last active December 19, 2015 17:19
Show Gist options
  • Save tobyp/5990764 to your computer and use it in GitHub Desktop.
Save tobyp/5990764 to your computer and use it in GitHub Desktop.
A smidgen of scheduling madness. Any feedback would be great, especially if you see a problem.
#include <chrono>
#include <condition_variable>
#include <deque>
#include <list>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
namespace scheduling {
template <class Clock>
class Scheduler {
typedef Clock clock_type;
typedef typename clock_type::time_point time_point;
typedef typename clock_type::duration duration;
typedef std::function<void()> task_type;
private:
struct Task {
public:
Task (task_type&& task, const time_point& start, const duration& repeat) : task(std::move(task)), start(start), repeat(repeat) { }
task_type task;
time_point start;
duration repeat;
bool operator<(const Task& other) const {
return start < other.start;
}
};
public:
typedef typename std::list<Task>::iterator task_handle;
private:
std::mutex mutex;
std::condition_variable tasks_updated;
std::deque<task_handle> todo;
std::condition_variable modified;
bool running;
std::list<Task> tasks;
std::list<task_handle> handles;
std::vector<std::thread> threads;
public:
Scheduler() : threads(4) {
}
~Scheduler() {
halt();
}
task_handle schedule(task_type&& task, const time_point& start, const duration& repeat=duration::zero()) {
task_handle h;
{
std::lock_guard<std::mutex> lk(mutex);
h = tasks.emplace(tasks.end(), std::move(task), start, repeat);
handles.push_back(h);
}
tasks_updated.notify_all();
return h;
}
task_handle schedule(task_type&& task, const duration& delay=duration::zero(), const duration& repeat=duration::zero()) {
return schedule(std::move(task, clock_type::now()+delay, repeat));
}
void unschedule(const task_handle& handle) {
{
std::lock_guard<std::mutex> lk(mutex);
auto handle_it = std::find(handles.begin(), handles.end(), handle);
if (handle_it != handles.end()) {
tasks.erase(handle);
todo.remove(handle);
handles.erase(handle_it);
}
}
tasks_updated.notify_all();
}
void clear() {
{
std::lock_guard<std::mutex> lk(mutex);
tasks.clear();
handles.clear();
}
tasks_updated.notify_all();
}
void run() {
{
std::lock_guard<std::mutex> lk(mutex);
if (running) return;
running = true;
for (auto& t : threads) {
t = std::thread([this]{this->loop();});
}
}
while (true) {
std::unique_lock<std::mutex> lk(mutex);
if (!running) break;
auto task_it = min_element(tasks.begin(), tasks.end());
time_point next_task = task_it == tasks.end() ? clock_type::time_point::max() : task_it->start;
if (tasks_updated.wait_until(lk, next_task) == std::cv_status::timeout) {
if (task_it->repeat != clock_type::duration::zero()) {
task_it->start += task_it->repeat;
}
else {
handles.remove(task_it);
tasks.erase(task_it);
}
todo.push_back(task_it);
modified.notify_all();
}
}
for (auto& t : threads) {
t.join();
}
}
void halt() {
{
std::lock_guard<std::mutex> lk(mutex);
if (!running) return;
running = false;
}
tasks_updated.notify_all();
modified.notify_all();
}
private:
void loop() {
while (true) {
std::function<void()> f;
{
std::unique_lock<std::mutex> lk(mutex);
while (todo.empty() && running) {
modified.wait(lk);
}
if (!running) {
return;
}
f = todo.front()->task;
todo.pop_front();
}
f();
}
}
};
}
#include <iostream>
void outp(const std::string& outp) {
static std::mutex m;
std::lock_guard<std::mutex> lk(m);
std::cout << std::this_thread::get_id() << ": " << outp << std::endl;
}
int main(int argc, char* argv[]) {
scheduling::Scheduler<std::chrono::steady_clock> sched;
sched.schedule([&sched]{outp("Task 1");}, std::chrono::steady_clock::now());
sched.schedule([&sched]{outp("Task 2");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 3");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 4");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 5");}, std::chrono::steady_clock::now()+std::chrono::seconds(2), std::chrono::seconds(2));
sched.schedule([&sched]{outp("Task 6");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 7");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 8");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 9");}, std::chrono::steady_clock::now()+std::chrono::seconds(3));
sched.schedule([&sched]{outp("Task 10"); sched.halt(); }, std::chrono::steady_clock::now()+std::chrono::seconds(5));
sched.run();
}
merged into scheduling.cpp
merged into scheduling.cpp (at the bottom)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment