Skip to content

Instantly share code, notes, and snippets.

@mrdomino
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mrdomino/aa82caa676045884551a to your computer and use it in GitHub Desktop.
Save mrdomino/aa82caa676045884551a to your computer and use it in GitHub Desktop.
#include <cassert>
#include <future>
#include <list>
#include <queue>
#include <thread>
#include <vector>
class State {
public:
State(): started(false), pending_get_values(false), exiting(false) {}
std::vector<int> values;
size_t i;
bool started;
bool pending_get_values;
bool exiting;
};
class Action {
public:
virtual ~Action() {}
virtual bool run(State*) = 0;
};
class Start : public Action {
public:
using result_type = bool;
Start(std::promise<result_type>&& p): p_(std::move(p)) {}
bool run(State* state) override {
p_.set_value(!state->started);
state->started = true;
return true;
}
private:
std::promise<result_type> p_;
};
class Stop : public Action {
public:
using result_type = bool;
Stop(std::promise<result_type>&& p): p_(std::move(p)) {}
bool run(State* state) override {
p_.set_value(state->started);
state->started = false;
return true;
}
private:
std::promise<result_type> p_;
};
class GetValues : public Action {
public:
using result_type = std::pair<bool, std::vector<int>>;
GetValues(std::promise<result_type>&& p, size_t n):
n_(n), p_(std::move(p)) {}
bool run(State* state) override {
if (!state->started) {
p_.set_value({false, {}});
return true;
}
if (state->pending_get_values)
return false;
if (state->values.size() < n_) {
state->pending_get_values = true;
return false;
}
if (state->values.size() == n_) {
p_.set_value({true, std::move(state->values)});
state->values.clear();
} else { // state->values.size() > n_
p_.set_value({true, std::vector<int>(
state->values.begin(), state->values.begin() + n_)});
state->values.erase(state->values.begin(), state->values.begin() + n_);
}
return true;
}
private:
const size_t n_;
std::promise<result_type> p_;
};
class Runner {
public:
template <typename A, typename... Args>
void add_action(Args&&... args) {
std::lock_guard<std::mutex> l(queue_lock_);
queue_.push(std::make_unique<A>(
std::forward<Args>(args)...));
}
void retrieve_new_actions() {
std::lock_guard<std::mutex> l(queue_lock_);
while (!queue_.empty()) {
auto a = std::move(queue_.front());
queue_.pop();
running_.emplace_back(std::move(a));
}
}
void step_state() {
if (state_.started) {
state_.values.push_back(state_.i++);
}
state_.pending_get_values = false;
}
void run_actions() {
for (auto it = running_.begin(); it != running_.end(); ++it) {
if ((*it)->run(&state_))
it = running_.erase(it);
}
}
bool run() {
retrieve_new_actions();
step_state();
run_actions();
return !state_.exiting;
}
private:
std::mutex queue_lock_; // not needed if using a lock-free queue
std::queue<std::unique_ptr<Action>> queue_;
State state_;
std::list<std::unique_ptr<Action>> running_;
};
class Session {
public:
Session():
runner_thread_(
[this] {
while (runner_.run()) {}
}) {}
~Session() {
runner_.add_action<ExitAction>();
runner_thread_.join();
}
bool start() {
return blocking<Start>();
}
bool stop() {
return blocking<Stop>();
}
GetValues::result_type get_values(size_t n) {
return blocking<GetValues>(n);
}
private:
class ExitAction : public Action {
public:
bool run(State* state) override {
state->exiting = true;
return true;
}
};
template <typename A, typename... Args>
typename A::result_type blocking(Args&&... args) {
std::promise<typename A::result_type> p;
auto r = p.get_future();
runner_.add_action<A>(std::move(p), std::forward<Args>(args)...);
return r.get();
}
Runner runner_;
std::thread runner_thread_;
};
Session session;
int main() {
session.start();
GetValues::result_type u, w;
auto t = std::thread(
[&u] {
u = session.get_values(1 << 10);
});
auto s = std::thread(
[&w] {
w = session.get_values(1 << 11);
});
auto v = session.get_values(1 << 12);
s.join();
t.join();
session.stop();
printf("%zu %zu %zu\n", u.second.size(), v.second.size(), w.second.size());
if (u.second.size() && v.second.size() && w.second.size())
printf("%d %d %d\n", u.second.at(0), v.second.at(0), w.second.at(0));
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment