Last active
August 29, 2015 14:23
-
-
Save mrdomino/aa82caa676045884551a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cassert> | |
#include <future> | |
#include <list> | |
#include <queue> | |
#include <thread> | |
#include <vector> | |
class State { | |
public: | |
State(): started(false), pending_get_values(false), exiting(false) {} | |
std::vector<int> values; | |
size_t i; | |
bool started; | |
bool pending_get_values; | |
bool exiting; | |
}; | |
class Action { | |
public: | |
virtual ~Action() {} | |
virtual bool run(State*) = 0; | |
}; | |
class Start : public Action { | |
public: | |
using result_type = bool; | |
Start(std::promise<result_type>&& p): p_(std::move(p)) {} | |
bool run(State* state) override { | |
p_.set_value(!state->started); | |
state->started = true; | |
return true; | |
} | |
private: | |
std::promise<result_type> p_; | |
}; | |
class Stop : public Action { | |
public: | |
using result_type = bool; | |
Stop(std::promise<result_type>&& p): p_(std::move(p)) {} | |
bool run(State* state) override { | |
p_.set_value(state->started); | |
state->started = false; | |
return true; | |
} | |
private: | |
std::promise<result_type> p_; | |
}; | |
class GetValues : public Action { | |
public: | |
using result_type = std::pair<bool, std::vector<int>>; | |
GetValues(std::promise<result_type>&& p, size_t n): | |
n_(n), p_(std::move(p)) {} | |
bool run(State* state) override { | |
if (!state->started) { | |
p_.set_value({false, {}}); | |
return true; | |
} | |
if (state->pending_get_values) | |
return false; | |
if (state->values.size() < n_) { | |
state->pending_get_values = true; | |
return false; | |
} | |
if (state->values.size() == n_) { | |
p_.set_value({true, std::move(state->values)}); | |
state->values.clear(); | |
} else { // state->values.size() > n_ | |
p_.set_value({true, std::vector<int>( | |
state->values.begin(), state->values.begin() + n_)}); | |
state->values.erase(state->values.begin(), state->values.begin() + n_); | |
} | |
return true; | |
} | |
private: | |
const size_t n_; | |
std::promise<result_type> p_; | |
}; | |
class Runner { | |
public: | |
template <typename A, typename... Args> | |
void add_action(Args&&... args) { | |
std::lock_guard<std::mutex> l(queue_lock_); | |
queue_.push(std::make_unique<A>( | |
std::forward<Args>(args)...)); | |
} | |
void retrieve_new_actions() { | |
std::lock_guard<std::mutex> l(queue_lock_); | |
while (!queue_.empty()) { | |
auto a = std::move(queue_.front()); | |
queue_.pop(); | |
running_.emplace_back(std::move(a)); | |
} | |
} | |
void step_state() { | |
if (state_.started) { | |
state_.values.push_back(state_.i++); | |
} | |
state_.pending_get_values = false; | |
} | |
void run_actions() { | |
for (auto it = running_.begin(); it != running_.end(); ++it) { | |
if ((*it)->run(&state_)) | |
it = running_.erase(it); | |
} | |
} | |
bool run() { | |
retrieve_new_actions(); | |
step_state(); | |
run_actions(); | |
return !state_.exiting; | |
} | |
private: | |
std::mutex queue_lock_; // not needed if using a lock-free queue | |
std::queue<std::unique_ptr<Action>> queue_; | |
State state_; | |
std::list<std::unique_ptr<Action>> running_; | |
}; | |
class Session { | |
public: | |
Session(): | |
runner_thread_( | |
[this] { | |
while (runner_.run()) {} | |
}) {} | |
~Session() { | |
runner_.add_action<ExitAction>(); | |
runner_thread_.join(); | |
} | |
bool start() { | |
return blocking<Start>(); | |
} | |
bool stop() { | |
return blocking<Stop>(); | |
} | |
GetValues::result_type get_values(size_t n) { | |
return blocking<GetValues>(n); | |
} | |
private: | |
class ExitAction : public Action { | |
public: | |
bool run(State* state) override { | |
state->exiting = true; | |
return true; | |
} | |
}; | |
template <typename A, typename... Args> | |
typename A::result_type blocking(Args&&... args) { | |
std::promise<typename A::result_type> p; | |
auto r = p.get_future(); | |
runner_.add_action<A>(std::move(p), std::forward<Args>(args)...); | |
return r.get(); | |
} | |
Runner runner_; | |
std::thread runner_thread_; | |
}; | |
Session session; | |
int main() { | |
session.start(); | |
GetValues::result_type u, w; | |
auto t = std::thread( | |
[&u] { | |
u = session.get_values(1 << 10); | |
}); | |
auto s = std::thread( | |
[&w] { | |
w = session.get_values(1 << 11); | |
}); | |
auto v = session.get_values(1 << 12); | |
s.join(); | |
t.join(); | |
session.stop(); | |
printf("%zu %zu %zu\n", u.second.size(), v.second.size(), w.second.size()); | |
if (u.second.size() && v.second.size() && w.second.size()) | |
printf("%d %d %d\n", u.second.at(0), v.second.at(0), w.second.at(0)); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment