Last active
August 10, 2019 05:14
-
-
Save yohhoy/ecdf00cb2a7852929954a1e6c79bdc25 to your computer and use it in GitHub Desktop.
"Sleep Sort" implementation with C++ Coroutines TS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <condition_variable> | |
#include <chrono> | |
#include <iostream> | |
#include <mutex> | |
#include <thread> | |
#include <utility> | |
#include <vector> | |
#include <experimental/coroutine> | |
struct thread_pool { | |
using time_point = std::chrono::system_clock::time_point; | |
using coro_handle = std::experimental::coroutine_handle<>; | |
explicit thread_pool(int n) | |
{ | |
for (int i = 0; i < n; i++) { | |
worker_.emplace_back([this]{ | |
while (auto task = queue_.dequeue()) | |
task.resume(); | |
}); | |
} | |
} | |
thread_pool(const thread_pool&) = delete; | |
~thread_pool() = default; | |
void enqueue(coro_handle coro, time_point tp = time_point::min()) | |
{ | |
queue_.enqueue(coro, tp); | |
} | |
void run_loop() | |
{ | |
queue_.close(); | |
for (auto&& th : worker_) | |
th.join(); | |
worker_.clear(); | |
} | |
private: | |
template <typename T> | |
struct delay_queue { | |
void enqueue(T v, time_point tp) | |
{ | |
std::lock_guard lk{mtx_}; | |
q_.emplace_back(std::move(v), tp); | |
// descending sort on time_point | |
std::sort(begin(q_), end(q_), [](auto&& a, auto&& b) { return a.second > b.second; }); | |
cv_.notify_one(); | |
} | |
T dequeue() | |
{ | |
std::unique_lock lk{mtx_}; | |
auto now = time_point::clock::now(); | |
// wait condition: (empty && closed) || (!empty && back.tp <= now) | |
while (!(q_.empty() && closed_) && !(!q_.empty() && q_.back().second <= now)) { | |
if (q_.empty()) | |
cv_.wait(lk); | |
else | |
cv_.wait_until(lk, q_.back().second); | |
now = time_point::clock::now(); | |
} | |
if (q_.empty() && closed_) | |
return {}; | |
T ret = std::move(q_.back().first); | |
q_.pop_back(); | |
if (q_.empty() && closed_) | |
cv_.notify_all(); | |
return ret; | |
} | |
void close() | |
{ | |
std::lock_guard lk{mtx_}; | |
closed_ = true; | |
cv_.notify_all(); | |
} | |
std::vector<std::pair<T, time_point>> q_; | |
bool closed_ = false; | |
std::mutex mtx_; | |
std::condition_variable cv_; | |
}; | |
delay_queue<coro_handle> queue_; | |
std::vector<std::thread> worker_; | |
}; | |
struct void_promise { | |
using handle_type = std::experimental::coroutine_handle<void_promise>; | |
thread_pool* tp_ = nullptr; | |
template <typename... Ts> | |
void_promise(thread_pool& tp, Ts&&...) | |
: tp_(&tp) {} | |
void get_return_object() {} | |
auto initial_suspend() | |
{ | |
struct awaiter { | |
thread_pool* tp_; | |
bool await_ready() { return false; } | |
void await_suspend(handle_type h) | |
{ | |
tp_->enqueue(h); | |
} | |
void await_resume() {} | |
}; | |
return awaiter{tp_}; | |
} | |
auto final_suspend() { return std::experimental::suspend_never{}; } | |
void return_void() {} | |
void unhandled_exception() { std::terminate(); } | |
// support "co_await <duration>" in coroutine | |
template <class Rep, class Period> | |
auto await_transform(const std::chrono::duration<Rep, Period>& d) | |
{ | |
struct awaiter { | |
thread_pool* tp_; | |
std::chrono::duration<Rep, Period> d_; | |
bool await_ready() { return false; } | |
void await_suspend(handle_type h) | |
{ | |
tp_->enqueue(h, thread_pool::time_point::clock::now() + d_); | |
} | |
void await_resume() {} | |
}; | |
return awaiter{tp_, d}; | |
} | |
}; | |
// adapt void(thread_pool&, Ts...) coroutine | |
namespace std::experimental { | |
template <typename... ArgTypes> | |
struct coroutine_traits<void, thread_pool&, ArgTypes...> { | |
using promise_type = void_promise; | |
}; | |
} | |
std::mutex cout_mutex; | |
#define COUT std::unique_lock{cout_mutex}, std::cout | |
void sleep_sort(thread_pool& tp, int n) | |
{ | |
using namespace std::chrono_literals; | |
co_await (n * 100ms); | |
COUT << n << " @" << std::this_thread::get_id() << std::endl; | |
} | |
int main() | |
{ | |
thread_pool tp(2); | |
sleep_sort(tp, 6); | |
sleep_sort(tp, 2); | |
sleep_sort(tp, 5); | |
sleep_sort(tp, 3); | |
sleep_sort(tp, 1); | |
sleep_sort(tp, 4); | |
tp.run_loop(); | |
} |
fix coroutine state leak https://wandbox.org/permlink/4Wqi6sZmVzdvpgSv
https://wandbox.org/permlink/X6TaqpJ4egbulzq2
fix coroutine task leak, remove redundant coro_action class
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
small refined version https://wandbox.org/permlink/xBQVRdVJb9m0iHz5