Skip to content

Instantly share code, notes, and snippets.

@sean-parent
Last active January 25, 2017 05:41
Show Gist options
  • Save sean-parent/ecc4f10452d6dbcd50c992c20f1e42b9 to your computer and use it in GitHub Desktop.
Save sean-parent/ecc4f10452d6dbcd50c992c20f1e42b9 to your computer and use it in GitHub Desktop.
First cut at experimental channel design - no flow control yet (will be a future<void> returned by send).
#include <deque>
#include <memory>
#include <mutex>
#include <future>
#include <boost/optional.hpp>
#include <boost/thread/future.hpp>
namespace stlab {
class process {
struct concept {
virtual ~concept() = default;
};
template <typename T>
struct model : concept {
template <typename... Args>
model(Args&&... args) : _self(std::forward<Args>(args)...) { }
T _self;
};
std::shared_ptr<concept> _p;
template <class T, class... Args>
friend process make_process(Args&&... args);
process(std::shared_ptr<concept> p) : _p(std::move(p)) { }
public:
process() = default;
};
namespace detail {
template <class T>
struct shared_sender {
virtual boost::future<void> send(T x) = 0;
virtual void close() = 0;
virtual void set_process(process) = 0;
};
template <class T>
struct shared_receiver {
virtual boost::future<boost::optional<T>> receive() = 0;
};
template <class T>
struct shared_channel final : shared_sender<T>, shared_receiver<T> {
process _process;
std::mutex _mutex;
std::deque<T> _q;
bool _closed = false;
std::size_t _buffer_size = 1;
boost::optional<boost::promise<boost::optional<T>>> _receive_promise;
boost::optional<boost::promise<void>> _send_promise;
/*
REVISIT : No flow control. Send should return a future<void> which is auto resolved
if there is space in the queue, otherwise it isn't resolved until there is space.
*/
boost::future<void> send(T x) override {
std::lock_guard<std::mutex> lock(_mutex);
// REVISIT : set_value() under the lock might deadlock if immediate continuation?
if (_q.empty() && _receive_promise) {
_receive_promise->set_value(std::move(x));
_receive_promise.reset();
return boost::make_ready_future();
}
_q.emplace_back(std::move(x));
if (_q.size() == _buffer_size) {
_send_promise = boost::promise<void>();
return _send_promise.get().get_future();
}
return boost::make_ready_future();
}
void close() override {
std::lock_guard<std::mutex> lock(_mutex);
// REVISIT : set_value() under the lock might deadlock if immediate continuation?
if (_q.empty() && _receive_promise) _receive_promise.get().set_value(boost::optional<T>());
else _closed = true;
}
void set_process(process p) override {
// Should not need to lock, can only be set once and controls lifetime of process bound
// to this sender
_process = std::move(p);
}
boost::future<boost::optional<T>> receive() override {
{
std::lock_guard<std::mutex> lock(_mutex);
if (!_q.empty()) {
auto result = boost::make_ready_future<boost::optional<T>>(std::move(_q.front()));
_q.pop_front();
if (_send_promise) {
_send_promise->set_value();
_send_promise.reset();
}
return result;
}
if (_closed) {
return boost::make_ready_future<boost::optional<T>>();
}
_receive_promise = boost::promise<boost::optional<T>>();
return _receive_promise.get().get_future();
}
}
};
} // namespace detail
template <class T> class receiver;
template <class T>
class sender {
std::weak_ptr<detail::shared_sender<T>> _p;
template <class U>
friend std::pair<sender<U>, receiver<U>> channel();
sender(std::weak_ptr<detail::shared_sender<T>> p) : _p(std::move(p)) { }
public:
sender() = default;
boost::future<void> operator()(T x) const {
auto p = _p.lock();
if (!p) return boost::make_ready_future();
return p->send(std::move(x));
}
void close() const {
auto p = _p.lock();
if (!p) return;
p->close();
}
/*
REVISIT : the process being set must hold the sender (this completes a strong/weak cycle).
Is there a better way?
*/
void set_process(process x) const {
auto p = _p.lock();
if (!p) return;
p->set_process(std::move(x));
}
};
template <class T>
class receiver {
std::shared_ptr<detail::shared_receiver<T>> _p;
template <class U>
friend std::pair<sender<U>, receiver<U>> channel();
receiver(std::shared_ptr<detail::shared_receiver<T>> p) : _p(std::move(p)) { }
public:
receiver() = default;
boost::future<boost::optional<T>> operator()() const {
return _p->receive();
}
};
template <class T>
std::pair<sender<T>, receiver<T>> channel() {
auto p = std::make_shared<detail::shared_channel<T>>();
return std::make_pair<sender<T>, receiver<T>>(sender<T>(p), receiver<T>(p));
}
template <class T, class... Args>
process make_process(Args&&... args) {
return process(std::make_shared<process::model<T>>(std::forward<Args>(args)...));
}
} // namespace stlab
struct mul2 {
stlab::receiver<int> _receive;
stlab::sender<int> _send;
mul2(stlab::receiver<int> receive, stlab::sender<int> send) :
_receive(std::move(receive)), _send(std::move(send)) {
run();
}
void run() {
_receive().then([this](auto x){
auto opt = x.get();
if (!opt) _send.close();
_receive().then([this, _x = *opt](auto y){
auto opt = y.get();
if (!opt) _send(_x).then([this](auto){ _send.close(); });
else _send(_x * *opt).then([this](auto){ run(); });
});
});
}
};
struct iota {
stlab::sender<int> _send;
int _min;
int _max;
iota(stlab::sender<int> send, int min, int max) :
_send(std::move(send)), _min(min), _max(max) {
run();
}
void run() {
if (_min == _max) {
_send.close();
return;
}
_send(_min).then([this](auto){
++_min;
run();
});
}
};
struct sum {
stlab::receiver<int> _receive;
stlab::sender<int> _send;
int _result = 0;
sum(stlab::receiver<int> receive, stlab::sender<int> send) :
_receive(std::move(receive)), _send(std::move(send)) {
run();
}
void run() {
_receive().then([this](auto x){
auto opt = x.get();
if (!opt) {
_send(_result).then([this](auto){
_send.close();
});
return;
}
_result += *opt;
run(); // continue
});
}
};
template <class F, class T> struct map;
template <class F, class R, class A>
struct map<F, R(A)> {
stlab::receiver<A> _receive;
stlab::sender<R> _send;
F _f;
map(stlab::receiver<A> receive, stlab::sender<R> send, F f) :
_receive(std::move(receive)), _send(std::move(send)), _f(std::move(f))
{
run();
}
void run() {
_receive().then([this](auto x){
auto opt = x.get();
if (!opt) _send.close();
else _send(_f(*opt)).then([this](auto){ run(); });
});
}
};
template <class T, class F, class R, class S>
auto make_map(R&& r, S&& s, F&& f) {
return stlab::make_process<map<std::decay_t<F>, T>>(std::forward<R>(r), std::forward<S>(s), std::forward<F>(f));
}
int main() {
stlab::sender<int> send1;
stlab::receiver<int> receive1;
std::tie(send1, receive1) = stlab::channel<int>();
send1.set_process(stlab::make_process<iota>(send1, 0, 10));
stlab::sender<int> send2;
stlab::receiver<int> receive2;
std::tie(send2, receive2) = stlab::channel<int>();
send2.set_process(make_map<int(int)>(receive1, send2, [](int x){ return x * 10; }));
stlab::sender<int> send3;
stlab::receiver<int> receive3;
std::tie(send3, receive3) = stlab::channel<int>();
send3.set_process(stlab::make_process<sum>(receive2, send3));
for (auto value = receive3().get(); value; value = receive3().get()) {
std::cout << *value << std::endl;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment