Instantly share code, notes, and snippets.
Created
April 21, 2020 10:23
-
Save madmongo1/12e87380b3f3d27d2ceb67bcc78a990a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// Copyright (c) 2020 Richard Hodges (hodges.r@gmail.com) | |
// Copyright (c) 2020 Alexander Hodges | |
// | |
// Official repository: https://github.com/AlexAndDad/gateway | |
// | |
#pragma once | |
#include "polyfill/net.hpp" | |
namespace polyfill::net | |
{ | |
template < class Impl, class Work, class Handler, class Signature> | |
struct shared_composed_op | |
{ | |
using composed_op_type = net::detail::composed_op< Impl, Work, Handler, Signature >; | |
using allocator_type = typename net::associated_allocator< composed_op_type >::type; | |
using executor_type = typename net::associated_executor< composed_op_type >::type; | |
shared_composed_op(composed_op_type &&op) | |
: impl_(std::make_shared< composed_op_type >(std::move(op))) | |
{ | |
} | |
shared_composed_op(std::shared_ptr< composed_op_type > op) | |
: impl_(std::move(op)) | |
{ | |
} | |
void initial_resume() { impl_->impl_(*this); } | |
template < class... Args > | |
void operator()(Args &&... args) | |
{ | |
if (impl_->invocations_ < ~unsigned(0)) | |
{ | |
++impl_->invocations_; | |
impl_->impl_(*this, std::forward< Args >(args)...); | |
} | |
} | |
template < class... Args > | |
void complete(Args &&... args) | |
{ | |
impl_->complete(std::forward< Args >(args)...); | |
} | |
auto get_allocator() const -> allocator_type { return impl_->get_allocator(); } | |
auto get_executor() const -> executor_type { return impl_->get_executor(); } | |
std::shared_ptr< composed_op_type > impl_; | |
}; | |
template < class Impl, class Work, class Handler, class Signature > | |
auto share(net::detail::composed_op< Impl, Work, Handler, Signature > &composed_op) | |
{ | |
auto op = shared_composed_op< Impl, Work, Handler, Signature >(std::move(composed_op)); | |
op.initial_resume(); | |
return op; | |
} | |
template < class Impl, class Work, class Handler, class Signature > | |
auto share(shared_composed_op< Impl, Work, Handler, Signature > shared_thing) | |
{ | |
return shared_thing; | |
} | |
template < typename Impl, typename Work, typename Handler, typename Signature > | |
inline void *asio_handler_allocate(std::size_t size, | |
shared_composed_op< Impl, Work, Handler, Signature > *this_handler) | |
{ | |
return boost_asio_handler_alloc_helpers::allocate(size, this_handler->impl_->handler_); | |
} | |
template < typename Impl, typename Work, typename Handler, typename Signature > | |
inline void asio_handler_deallocate(void * pointer, | |
std::size_t size, | |
shared_composed_op< Impl, Work, Handler, Signature > *this_handler) | |
{ | |
boost_asio_handler_alloc_helpers::deallocate(pointer, size, this_handler->impl_->handler_); | |
} | |
template < typename Impl, typename Work, typename Handler, typename Signature > | |
inline bool asio_handler_is_continuation(shared_composed_op< Impl, Work, Handler, Signature > *this_handler) | |
{ | |
return asio_handler_is_continuation(this_handler->impl_.get()); | |
} | |
template < typename Function, typename Impl, typename Work, typename Handler, typename Signature > | |
inline void asio_handler_invoke(Function & function, | |
shared_composed_op< Impl, Work, Handler, Signature > *this_handler) | |
{ | |
boost_asio_handler_invoke_helpers::invoke(function, this_handler->impl_->handler_); | |
} | |
template < typename Function, typename Impl, typename Work, typename Handler, typename Signature > | |
inline void asio_handler_invoke(const Function & function, | |
shared_composed_op< Impl, Work, Handler, Signature > *this_handler) | |
{ | |
boost_asio_handler_invoke_helpers::invoke(function, this_handler->impl_->handler_); | |
} | |
} // namespace polyfill::net | |
#include "polyfill/net/shared_composed_op.hpp" | |
#include <boost/beast/_experimental/test/handler.hpp> | |
#include <boost/beast/core/bind_handler.hpp> | |
#include <catch2/catch.hpp> | |
#include <iostream> | |
using namespace polyfill; | |
using namespace std::literals; | |
using boost::beast::bind_front_handler; | |
struct has_error | |
{ | |
error_code error; | |
error_code &assign_error(error_code &ec) | |
{ | |
if (not error.failed() and ec.failed()) | |
error = ec; | |
return error; | |
} | |
}; | |
template < class Derived > | |
struct has_timeout | |
{ | |
has_timeout(net::executor e, std::chrono::milliseconds to) | |
: timer_(e) | |
, to_(to) | |
{ | |
} | |
struct timer_event | |
{ | |
}; | |
template < class Self > | |
void start_timeout(Self &self) | |
{ | |
timer_.expires_after(to_); | |
timer_.async_wait(bind_front_handler(net::share(self), timer_event())); | |
} | |
void cancel_timeout() { timer_.cancel(); } | |
template < class Self > | |
void operator()(Self &self, timer_event, error_code ec) | |
{ | |
auto &this_ = *static_cast< Derived * >(this); | |
timeout = true; | |
if (ec == net::error::operation_aborted) | |
ec.clear(); | |
else if (ec == error_code()) | |
{ | |
ec = net::error::timed_out; | |
this_.on_timeout(); | |
} | |
this_(self, this_.assign_error(ec)); | |
} | |
bool timeout = false; | |
net::high_resolution_timer timer_; | |
net::high_resolution_timer::duration to_; | |
}; | |
template < class Protocol, class Derived > | |
struct has_resolver | |
{ | |
using resolver = typename Protocol::resolver; | |
using resolver_results = typename resolver::results_type; | |
has_resolver(net::executor e) | |
: resolver_(e) | |
{ | |
} | |
void cancel_resolver() { resolver_.cancel(); } | |
template < class Self > | |
void start_resolving(Self &self, std::string const &hostname, std::string const &service) | |
{ | |
resolver_.async_resolve(hostname, service, net::share(self)); | |
} | |
template < class Self > | |
void operator()(Self &self, error_code ec, typename resolver::results_type results) | |
{ | |
auto &this_ = *static_cast< Derived * >(this); | |
resolved = true; | |
if (ec == net::error::operation_aborted) | |
ec.clear(); | |
else if (ec.failed()) | |
this_.cancel_timeout(); | |
else | |
endpoints = results; | |
this_(self, this_.assign_error(ec)); | |
} | |
resolver resolver_; | |
resolver_results endpoints; | |
bool resolved = false; | |
}; | |
template < class Protocol, class Derived > | |
struct has_multiconnect | |
{ | |
has_multiconnect(net::executor e) {} | |
private: | |
using protocol = Protocol; | |
using socket_type = typename protocol::socket; | |
using endpoint_type = typename protocol::endpoint; | |
using resolver = typename protocol::resolver; | |
using resolver_results = typename resolver::results_type; | |
struct socket_id | |
{ | |
std::size_t sock; | |
}; | |
public: | |
void cancel_connects() | |
{ | |
if (not sockets_canceled) | |
{ | |
sockets_canceled = true; | |
for (auto &s : socks) | |
s.cancel(); | |
} | |
} | |
template < class Self > | |
void operator()(Self &self, socket_id id, error_code ec) | |
{ | |
auto &this_ = *static_cast< Derived * >(this); | |
--sockets_remaining; | |
if (not ec.failed() and not sockets_canceled) // we are first socket to complete | |
{ | |
this->cancel_connects(); | |
this_.on_socket_connect(std::move(socks[id.sock])); | |
} | |
else if (ec == net::error::operation_aborted) | |
ec.clear(); | |
if (sockets_remaining) | |
ec.clear(); | |
this_(self, this_.assign_error(ec)); | |
} | |
template < class Self > | |
void initiate_connects(Self &self, resolver_results const &endpoints) | |
{ | |
std::transform(endpoints.begin(), endpoints.end(), std::back_inserter(socks), [&](auto iter) { | |
return socket_type(self.get_executor()); | |
}); | |
auto i = std::size_t(0); | |
for (auto &&epi : endpoints) | |
{ | |
socks[i].async_connect(epi.endpoint(), boost::beast::bind_front_handler(net::share(self), socket_id { i })); | |
++i; | |
} | |
sockets_remaining = i; | |
} | |
std::vector< socket_type > socks; | |
std::size_t sockets_remaining = 0; | |
bool sockets_canceled = false; | |
}; | |
template < class Socket > | |
struct mass_connect_op | |
: net::coroutine | |
, has_error | |
, has_timeout< mass_connect_op< Socket > > | |
, has_resolver< typename Socket::protocol_type, mass_connect_op< Socket > > | |
, has_multiconnect< typename Socket::protocol_type, mass_connect_op< Socket > > | |
{ | |
using protocol = typename Socket::protocol_type; | |
using socket_type = typename protocol::socket; | |
using endpoint_type = typename protocol::endpoint; | |
using resolver = typename protocol::resolver; | |
mass_connect_op(Socket &sock, std::string hostname, std::string port, std::chrono::milliseconds timeout) | |
: net::coroutine() | |
, has_error() | |
, has_timeout< mass_connect_op< Socket > >(sock.get_executor(), timeout) | |
, has_resolver< protocol, mass_connect_op< Socket > >(sock.get_executor()) | |
, has_multiconnect< protocol, mass_connect_op< Socket > >(sock.get_executor()) | |
, sock_(sock) | |
, hostname_(std::move(hostname)) | |
, port_(std::move(port)) | |
{ | |
} | |
using has_timeout< mass_connect_op< Socket > >:: operator(); | |
using has_resolver< protocol, mass_connect_op< Socket > >::operator(); | |
using has_multiconnect< protocol, mass_connect_op< Socket > >::operator(); | |
void on_timeout() | |
{ | |
this->cancel_resolver(); | |
this->cancel_connects(); | |
} | |
void on_resolved() { this->cancel_timeout(); } | |
void on_socket_connect(Socket s) | |
{ | |
this->cancel_timeout(); | |
sock_ = std::move(s); | |
} | |
template < class Self > | |
void operator()(Self &self, error_code ec = {}) | |
{ | |
#include <boost/asio/yield.hpp> | |
reenter(this) for (;;) | |
{ | |
// transform this coroutine into a shared coroutine | |
yield net::share(self); | |
// start the timer and a resolve operation | |
yield | |
{ | |
this->start_timeout(self); | |
this->start_resolving(self, hostname_, port_); | |
} | |
// wait for the resolve to complete one way or another | |
while (not this->resolved) | |
yield; | |
// if we have a failure, complete... | |
if (ec.failed()) | |
{ | |
// ... after ensuring that the timer completion handler has been invoked | |
while (not this->timeout) | |
yield; | |
} | |
else | |
{ | |
// otherwise, simulatenously connect to all endpoints | |
yield this->initiate_connects(self, this->endpoints); | |
// yield until all connect completion handlers have been invoked | |
while (this->sockets_remaining) | |
yield; | |
// and yield until the timer completion handler has been invoked | |
while (not this->timeout) | |
yield; | |
} | |
// return the result of the connect | |
return self.complete(ec); | |
} | |
#include <boost/asio/unyield.hpp> | |
} | |
Socket & sock_; | |
std::string hostname_, port_; | |
}; | |
template < class Socket, class CompletionToken > | |
auto async_mass_connect(Socket & sock, | |
std::string host, | |
std::string port, | |
std::chrono::milliseconds timeout, | |
CompletionToken && token) | |
{ | |
return net::async_compose< CompletionToken, void(error_code) >( | |
mass_connect_op< Socket >(sock, std::move(host), std::move(port), timeout), token, sock); | |
} | |
TEST_CASE("polyfill::net::shared_composed_op") | |
{ | |
auto ioc = net::io_context(); | |
auto s = net::ip::tcp::socket(ioc); | |
async_mass_connect(s, "www.example.com", "http", 10s, [&](error_code ec) { | |
std::cout << ec.message() << std::endl; | |
auto ep = s.remote_endpoint(ec); | |
std::cout << ep << std::endl; | |
}); | |
boost::beast::test::run(ioc); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment