Last active
January 25, 2017 05:41
-
-
Save sean-parent/ecc4f10452d6dbcd50c992c20f1e42b9 to your computer and use it in GitHub Desktop.
First cut at experimental channel design - no flow control yet (will be a future<void> returned by send).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <deque> | |
#include <memory> | |
#include <mutex> | |
#include <future> | |
#include <boost/optional.hpp> | |
#include <boost/thread/future.hpp> | |
namespace stlab { | |
class process { | |
struct concept { | |
virtual ~concept() = default; | |
}; | |
template <typename T> | |
struct model : concept { | |
template <typename... Args> | |
model(Args&&... args) : _self(std::forward<Args>(args)...) { } | |
T _self; | |
}; | |
std::shared_ptr<concept> _p; | |
template <class T, class... Args> | |
friend process make_process(Args&&... args); | |
process(std::shared_ptr<concept> p) : _p(std::move(p)) { } | |
public: | |
process() = default; | |
}; | |
namespace detail { | |
template <class T> | |
struct shared_sender { | |
virtual boost::future<void> send(T x) = 0; | |
virtual void close() = 0; | |
virtual void set_process(process) = 0; | |
}; | |
template <class T> | |
struct shared_receiver { | |
virtual boost::future<boost::optional<T>> receive() = 0; | |
}; | |
template <class T> | |
struct shared_channel final : shared_sender<T>, shared_receiver<T> { | |
process _process; | |
std::mutex _mutex; | |
std::deque<T> _q; | |
bool _closed = false; | |
std::size_t _buffer_size = 1; | |
boost::optional<boost::promise<boost::optional<T>>> _receive_promise; | |
boost::optional<boost::promise<void>> _send_promise; | |
/* | |
REVISIT : No flow control. Send should return a future<void> which is auto resolved | |
if there is space in the queue, otherwise it isn't resolved until there is space. | |
*/ | |
boost::future<void> send(T x) override { | |
std::lock_guard<std::mutex> lock(_mutex); | |
// REVISIT : set_value() under the lock might deadlock if immediate continuation? | |
if (_q.empty() && _receive_promise) { | |
_receive_promise->set_value(std::move(x)); | |
_receive_promise.reset(); | |
return boost::make_ready_future(); | |
} | |
_q.emplace_back(std::move(x)); | |
if (_q.size() == _buffer_size) { | |
_send_promise = boost::promise<void>(); | |
return _send_promise.get().get_future(); | |
} | |
return boost::make_ready_future(); | |
} | |
void close() override { | |
std::lock_guard<std::mutex> lock(_mutex); | |
// REVISIT : set_value() under the lock might deadlock if immediate continuation? | |
if (_q.empty() && _receive_promise) _receive_promise.get().set_value(boost::optional<T>()); | |
else _closed = true; | |
} | |
void set_process(process p) override { | |
// Should not need to lock, can only be set once and controls lifetime of process bound | |
// to this sender | |
_process = std::move(p); | |
} | |
boost::future<boost::optional<T>> receive() override { | |
{ | |
std::lock_guard<std::mutex> lock(_mutex); | |
if (!_q.empty()) { | |
auto result = boost::make_ready_future<boost::optional<T>>(std::move(_q.front())); | |
_q.pop_front(); | |
if (_send_promise) { | |
_send_promise->set_value(); | |
_send_promise.reset(); | |
} | |
return result; | |
} | |
if (_closed) { | |
return boost::make_ready_future<boost::optional<T>>(); | |
} | |
_receive_promise = boost::promise<boost::optional<T>>(); | |
return _receive_promise.get().get_future(); | |
} | |
} | |
}; | |
} // namespace detail | |
template <class T> class receiver; | |
template <class T> | |
class sender { | |
std::weak_ptr<detail::shared_sender<T>> _p; | |
template <class U> | |
friend std::pair<sender<U>, receiver<U>> channel(); | |
sender(std::weak_ptr<detail::shared_sender<T>> p) : _p(std::move(p)) { } | |
public: | |
sender() = default; | |
boost::future<void> operator()(T x) const { | |
auto p = _p.lock(); | |
if (!p) return boost::make_ready_future(); | |
return p->send(std::move(x)); | |
} | |
void close() const { | |
auto p = _p.lock(); | |
if (!p) return; | |
p->close(); | |
} | |
/* | |
REVISIT : the process being set must hold the sender (this completes a strong/weak cycle). | |
Is there a better way? | |
*/ | |
void set_process(process x) const { | |
auto p = _p.lock(); | |
if (!p) return; | |
p->set_process(std::move(x)); | |
} | |
}; | |
template <class T> | |
class receiver { | |
std::shared_ptr<detail::shared_receiver<T>> _p; | |
template <class U> | |
friend std::pair<sender<U>, receiver<U>> channel(); | |
receiver(std::shared_ptr<detail::shared_receiver<T>> p) : _p(std::move(p)) { } | |
public: | |
receiver() = default; | |
boost::future<boost::optional<T>> operator()() const { | |
return _p->receive(); | |
} | |
}; | |
template <class T> | |
std::pair<sender<T>, receiver<T>> channel() { | |
auto p = std::make_shared<detail::shared_channel<T>>(); | |
return std::make_pair<sender<T>, receiver<T>>(sender<T>(p), receiver<T>(p)); | |
} | |
template <class T, class... Args> | |
process make_process(Args&&... args) { | |
return process(std::make_shared<process::model<T>>(std::forward<Args>(args)...)); | |
} | |
} // namespace stlab | |
struct mul2 { | |
stlab::receiver<int> _receive; | |
stlab::sender<int> _send; | |
mul2(stlab::receiver<int> receive, stlab::sender<int> send) : | |
_receive(std::move(receive)), _send(std::move(send)) { | |
run(); | |
} | |
void run() { | |
_receive().then([this](auto x){ | |
auto opt = x.get(); | |
if (!opt) _send.close(); | |
_receive().then([this, _x = *opt](auto y){ | |
auto opt = y.get(); | |
if (!opt) _send(_x).then([this](auto){ _send.close(); }); | |
else _send(_x * *opt).then([this](auto){ run(); }); | |
}); | |
}); | |
} | |
}; | |
struct iota { | |
stlab::sender<int> _send; | |
int _min; | |
int _max; | |
iota(stlab::sender<int> send, int min, int max) : | |
_send(std::move(send)), _min(min), _max(max) { | |
run(); | |
} | |
void run() { | |
if (_min == _max) { | |
_send.close(); | |
return; | |
} | |
_send(_min).then([this](auto){ | |
++_min; | |
run(); | |
}); | |
} | |
}; | |
struct sum { | |
stlab::receiver<int> _receive; | |
stlab::sender<int> _send; | |
int _result = 0; | |
sum(stlab::receiver<int> receive, stlab::sender<int> send) : | |
_receive(std::move(receive)), _send(std::move(send)) { | |
run(); | |
} | |
void run() { | |
_receive().then([this](auto x){ | |
auto opt = x.get(); | |
if (!opt) { | |
_send(_result).then([this](auto){ | |
_send.close(); | |
}); | |
return; | |
} | |
_result += *opt; | |
run(); // continue | |
}); | |
} | |
}; | |
template <class F, class T> struct map; | |
template <class F, class R, class A> | |
struct map<F, R(A)> { | |
stlab::receiver<A> _receive; | |
stlab::sender<R> _send; | |
F _f; | |
map(stlab::receiver<A> receive, stlab::sender<R> send, F f) : | |
_receive(std::move(receive)), _send(std::move(send)), _f(std::move(f)) | |
{ | |
run(); | |
} | |
void run() { | |
_receive().then([this](auto x){ | |
auto opt = x.get(); | |
if (!opt) _send.close(); | |
else _send(_f(*opt)).then([this](auto){ run(); }); | |
}); | |
} | |
}; | |
template <class T, class F, class R, class S> | |
auto make_map(R&& r, S&& s, F&& f) { | |
return stlab::make_process<map<std::decay_t<F>, T>>(std::forward<R>(r), std::forward<S>(s), std::forward<F>(f)); | |
} | |
int main() { | |
stlab::sender<int> send1; | |
stlab::receiver<int> receive1; | |
std::tie(send1, receive1) = stlab::channel<int>(); | |
send1.set_process(stlab::make_process<iota>(send1, 0, 10)); | |
stlab::sender<int> send2; | |
stlab::receiver<int> receive2; | |
std::tie(send2, receive2) = stlab::channel<int>(); | |
send2.set_process(make_map<int(int)>(receive1, send2, [](int x){ return x * 10; })); | |
stlab::sender<int> send3; | |
stlab::receiver<int> receive3; | |
std::tie(send3, receive3) = stlab::channel<int>(); | |
send3.set_process(stlab::make_process<sum>(receive2, send3)); | |
for (auto value = receive3().get(); value; value = receive3().get()) { | |
std::cout << *value << std::endl; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment