Skip to content

Instantly share code, notes, and snippets.

@yohhoy
Last active August 10, 2019 05:14
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yohhoy/ecdf00cb2a7852929954a1e6c79bdc25 to your computer and use it in GitHub Desktop.
Save yohhoy/ecdf00cb2a7852929954a1e6c79bdc25 to your computer and use it in GitHub Desktop.
"Sleep Sort" implementation with C++ Coroutines TS
#include <condition_variable>
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>
#include <experimental/coroutine>
struct thread_pool {
using time_point = std::chrono::system_clock::time_point;
using coro_handle = std::experimental::coroutine_handle<>;
explicit thread_pool(int n)
{
for (int i = 0; i < n; i++) {
worker_.emplace_back([this]{
while (auto task = queue_.dequeue())
task.resume();
});
}
}
thread_pool(const thread_pool&) = delete;
~thread_pool() = default;
void enqueue(coro_handle coro, time_point tp = time_point::min())
{
queue_.enqueue(coro, tp);
}
void run_loop()
{
queue_.close();
for (auto&& th : worker_)
th.join();
worker_.clear();
}
private:
template <typename T>
struct delay_queue {
void enqueue(T v, time_point tp)
{
std::lock_guard lk{mtx_};
q_.emplace_back(std::move(v), tp);
// descending sort on time_point
std::sort(begin(q_), end(q_), [](auto&& a, auto&& b) { return a.second > b.second; });
cv_.notify_one();
}
T dequeue()
{
std::unique_lock lk{mtx_};
auto now = time_point::clock::now();
// wait condition: (empty && closed) || (!empty && back.tp <= now)
while (!(q_.empty() && closed_) && !(!q_.empty() && q_.back().second <= now)) {
if (q_.empty())
cv_.wait(lk);
else
cv_.wait_until(lk, q_.back().second);
now = time_point::clock::now();
}
if (q_.empty() && closed_)
return {};
T ret = std::move(q_.back().first);
q_.pop_back();
if (q_.empty() && closed_)
cv_.notify_all();
return ret;
}
void close()
{
std::lock_guard lk{mtx_};
closed_ = true;
cv_.notify_all();
}
std::vector<std::pair<T, time_point>> q_;
bool closed_ = false;
std::mutex mtx_;
std::condition_variable cv_;
};
delay_queue<coro_handle> queue_;
std::vector<std::thread> worker_;
};
struct void_promise {
using handle_type = std::experimental::coroutine_handle<void_promise>;
thread_pool* tp_ = nullptr;
template <typename... Ts>
void_promise(thread_pool& tp, Ts&&...)
: tp_(&tp) {}
void get_return_object() {}
auto initial_suspend()
{
struct awaiter {
thread_pool* tp_;
bool await_ready() { return false; }
void await_suspend(handle_type h)
{
tp_->enqueue(h);
}
void await_resume() {}
};
return awaiter{tp_};
}
auto final_suspend() { return std::experimental::suspend_never{}; }
void return_void() {}
void unhandled_exception() { std::terminate(); }
// support "co_await <duration>" in coroutine
template <class Rep, class Period>
auto await_transform(const std::chrono::duration<Rep, Period>& d)
{
struct awaiter {
thread_pool* tp_;
std::chrono::duration<Rep, Period> d_;
bool await_ready() { return false; }
void await_suspend(handle_type h)
{
tp_->enqueue(h, thread_pool::time_point::clock::now() + d_);
}
void await_resume() {}
};
return awaiter{tp_, d};
}
};
// adapt void(thread_pool&, Ts...) coroutine
namespace std::experimental {
template <typename... ArgTypes>
struct coroutine_traits<void, thread_pool&, ArgTypes...> {
using promise_type = void_promise;
};
}
std::mutex cout_mutex;
#define COUT std::unique_lock{cout_mutex}, std::cout
void sleep_sort(thread_pool& tp, int n)
{
using namespace std::chrono_literals;
co_await (n * 100ms);
COUT << n << " @" << std::this_thread::get_id() << std::endl;
}
int main()
{
thread_pool tp(2);
sleep_sort(tp, 6);
sleep_sort(tp, 2);
sleep_sort(tp, 5);
sleep_sort(tp, 3);
sleep_sort(tp, 1);
sleep_sort(tp, 4);
tp.run_loop();
}
@yohhoy
Copy link
Author

yohhoy commented Apr 14, 2018

alternate version of https://gist.github.com/yohhoy/a5ec6d4aeeb4c60d3e4f3adfd1df9ebf with local thread_pool.

@yohhoy
Copy link
Author

yohhoy commented Apr 3, 2019

@yohhoy
Copy link
Author

yohhoy commented Aug 9, 2019

@yohhoy
Copy link
Author

yohhoy commented Aug 10, 2019

@yohhoy
Copy link
Author

yohhoy commented Aug 10, 2019

https://wandbox.org/permlink/X6TaqpJ4egbulzq2
fix coroutine task leak, remove redundant coro_action class

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment