Created
December 8, 2012 12:36
-
-
Save Flast/4240111 to your computer and use it in GitHub Desktop.
Graceful Restart with Boost.Asio
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#define BOOST_RESULT_OF_USE_DECLTYPE | |
#include <chrono> | |
#include <thread> | |
#include <boost/mpi.hpp> | |
#include <iostream> | |
#include <sstream> | |
#include <utility> | |
#include <ratio> | |
#include <vector> | |
#include <string> | |
#include <iterator> | |
#include <boost/range/algorithm/find.hpp> | |
#include <stdexcept> | |
#include <system_error> | |
#include <boost/throw_exception.hpp> | |
#include <boost/exception/diagnostic_information.hpp> | |
#include <boost/asio.hpp> | |
namespace asio = boost::asio; | |
using asio::ip::tcp; | |
#include <sys/ioctl.h> | |
#include <net/if.h> | |
namespace detail { | |
unsigned long | |
scope_id_from_if(std::string if_) | |
{ | |
static asio::io_service ios; | |
static tcp::socket s(ios, tcp::v6()); | |
ifreq req; | |
strcpy(req.ifr_name, if_.c_str()); | |
if (ioctl(s.native_handle(), SIOCGIFINDEX, &req) < 0) | |
{ | |
perror("ioctl"); | |
BOOST_THROW_EXCEPTION(std::system_error(errno, std::system_category())); | |
} | |
return req.ifr_ifindex; | |
} | |
} // namespace detail | |
asio::ip::address_v6 | |
v6_from_string(std::string s) | |
{ | |
using asio::ip::address_v6; | |
auto i = boost::find(s, '%'); | |
auto addr = address_v6::from_string(std::string(s.begin(), i)); | |
if (i != s.end()) | |
{ | |
const std::string if_(std::next(i), s.end()); | |
addr.scope_id(detail::scope_id_from_if(if_)); | |
} | |
return addr; | |
} | |
int main(int argc, char *argv[]) try | |
{ | |
boost::mpi::environment env(argc, argv); | |
boost::mpi::communicator world; | |
if (argc != 3) | |
{ | |
BOOST_THROW_EXCEPTION( | |
std::invalid_argument("usage: $ bin server count")); | |
} | |
const auto prefix = [&]() -> std::string | |
{ | |
std::ostringstream ostr; | |
ostr << env.processor_name() | |
<< "[" << world.rank() << "/" << world.size() << "]: "; | |
return ostr.str(); | |
}(); | |
const size_t count = std::stoul(argv[2]); | |
//using asio::ip::address_v4; | |
//const tcp::endpoint server(address_v4::from_string(argv[1]), 5555); | |
const tcp::endpoint server(v6_from_string(argv[1]), 5555); | |
if (world.rank() == 0) | |
{ | |
std::cout << "Total MPI process: " << world.size() << std::endl; | |
} | |
world.barrier(); | |
std::this_thread::sleep_for(std::chrono::milliseconds(100 * world.rank())); | |
const size_t size = 100 * std::mega::num; | |
std::vector<char> buffer(std::mega::num * 10); | |
for (size_t i = 0; i < count; ++i) | |
{ | |
asio::io_service ios; | |
tcp::socket s(ios); | |
boost::system::error_code err; | |
if (s.connect(server, err)) | |
{ | |
std::ostringstream ostr; | |
ostr << prefix << "connection failed (" << err << ")\n"; | |
std::cout << ostr.str(); | |
continue; | |
} | |
for (size_t total = 0; total < size; ) | |
{ | |
if (err) | |
{ | |
std::ostringstream ostr; | |
ostr << prefix << "read_some failed (" << err << ")\n"; | |
std::cout << ostr.str(); | |
break; | |
} | |
total += s.read_some(asio::buffer(buffer), err); | |
} | |
} | |
} | |
catch (boost::exception &e) | |
{ | |
std::cout << diagnostic_information(e) << std::endl; | |
} | |
catch (std::exception &e) | |
{ | |
std::cout << "caught std::exception" << std::endl | |
<< " what: " << e.what() << std::endl; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <sys/un.h> | |
#include <signal.h> | |
#include <functional> | |
#include <future> | |
#include <thread> | |
#include <type_traits> | |
#include <ratio> | |
#include <algorithm> | |
#include <iterator> | |
#include <array> | |
#include <vector> | |
#include <boost/range/algorithm/copy.hpp> | |
#include <string> | |
#include <iostream> | |
#include <boost/lexical_cast.hpp> | |
#include <boost/utility/value_init.hpp> | |
#include <boost/system/system_error.hpp> | |
#include <boost/throw_exception.hpp> | |
#include <boost/exception/diagnostic_information.hpp> | |
#include <boost/asio.hpp> | |
namespace asio = boost::asio; | |
using asio::ip::tcp; | |
using asio::local::datagram_protocol; | |
namespace std { | |
template <int n> | |
struct is_placeholder<boost::arg<n>> | |
: public integral_constant<int, n> {}; | |
template <int n> | |
struct is_placeholder<boost::arg<n>(*)()> | |
: public integral_constant<int, n> {}; | |
} // namespace std | |
namespace graceful { | |
namespace detail { | |
template <typename T, typename U> | |
inline typename std::enable_if<std::is_pointer<T>::value, T>::type | |
alias_cast(U *u) noexcept | |
{ | |
typedef typename std::remove_pointer<U>::type U_; | |
typedef typename | |
std::conditional<std::is_const<U_>::value, const void, void>::type | |
c_void; | |
typedef typename | |
std::conditional<std::is_volatile<U_>::value, volatile c_void, c_void>::type | |
cv_void; | |
return static_cast<T>(static_cast<cv_void *>(u)); | |
} | |
} // namespace graceful::detail | |
template <typename Acceptor> | |
void | |
send_acceptor(datagram_protocol::socket &s, Acceptor &acc) | |
{ | |
using detail::alias_cast; | |
std::cout << "[" << getpid() << "] Receive graceful signal" << std::endl; | |
s.connect(s.local_endpoint()); | |
constexpr auto buflen = CMSG_SPACE(sizeof(int)); | |
std::aligned_storage<buflen, alignof(cmsghdr)>::type cmsgbuf; | |
cmsghdr *cmsg = alias_cast<cmsghdr *>(std::addressof(cmsgbuf)); | |
cmsg->cmsg_len = CMSG_LEN(sizeof(int)); | |
cmsg->cmsg_level = SOL_SOCKET; | |
cmsg->cmsg_type = SCM_RIGHTS; | |
*alias_cast<int *>(CMSG_DATA(cmsg)) = acc.native_handle(); | |
boost::initialized<msghdr> msg_; | |
msghdr &msg = msg_; | |
msg.msg_control = std::addressof(cmsgbuf); | |
msg.msg_controllen = buflen; | |
if (sendmsg(s.native_handle(), &msg, 0) < 0) | |
{ | |
using namespace boost::system; | |
BOOST_THROW_EXCEPTION(system_error(errno, generic_category())); | |
} | |
} | |
template <typename Acceptor> | |
void | |
recv_acceptor(datagram_protocol::socket &s, Acceptor &a | |
, const typename Acceptor::protocol_type &p) | |
{ | |
using detail::alias_cast; | |
const char x[1] = {}; | |
s.send(asio::buffer(x)); // signal | |
std::cout << "[" << getpid() << "] Send graceful signal" << std::endl; | |
constexpr auto buflen = CMSG_SPACE(sizeof(int)); | |
std::aligned_storage<buflen, alignof(cmsghdr)>::type cmsgbuf; | |
boost::initialized<msghdr> msg_; | |
msghdr &msg = msg_; | |
msg.msg_control = std::addressof(cmsgbuf); | |
msg.msg_controllen = buflen; | |
msg.msg_flags = MSG_WAITALL; | |
if (recvmsg(s.native_handle(), &msg, 0) < 0) | |
{ | |
using namespace boost::system; | |
BOOST_THROW_EXCEPTION(system_error(errno, generic_category())); | |
} | |
const cmsghdr *cmsg = alias_cast<const cmsghdr *>(std::addressof(cmsgbuf)); | |
const int fd = *alias_cast<const int *>(CMSG_DATA(cmsg)); | |
a.assign(p, fd); | |
} | |
template <typename Acceptor> | |
std::shared_ptr<Acceptor> | |
create_acceptor_or_recv(asio::io_service &ios, std::string path | |
, const typename Acceptor::endpoint_type &ep) | |
{ | |
typedef datagram_protocol::socket local_socket; | |
std::shared_ptr<Acceptor> acc = nullptr; | |
auto l = std::make_shared<local_socket>(ios, datagram_protocol()); | |
datagram_protocol::endpoint dep(path); | |
try | |
{ | |
l->bind(dep); | |
acc = std::make_shared<Acceptor>(ios, ep); | |
} | |
catch (boost::system::system_error &) | |
{ | |
l->connect(dep); | |
unlink(path.c_str()); | |
l->bind(dep); | |
acc = std::make_shared<Acceptor>(ios); | |
recv_acceptor(*l, *acc, ep.protocol()); | |
} | |
l->async_receive(asio::null_buffers(), std::bind([=, &ios]() noexcept | |
{ | |
send_acceptor(*l, *acc); | |
ios.stop(); | |
})); | |
return move(acc); | |
} | |
} // namespace graceful | |
template <typename Acceptor> | |
Acceptor | |
dup_acceptor(asio::io_service &ios, Acceptor &acc) | |
{ | |
const int fd = dup(acc.native_handle()); | |
if (fd < 0) | |
{ | |
using namespace boost::system; | |
BOOST_THROW_EXCEPTION(system_error(errno, generic_category())); | |
} | |
return Acceptor(ios, acc.local_endpoint().protocol(), fd); | |
} | |
template <typename Acceptor, typename Buf> | |
void | |
start_accept(Acceptor, Buf); | |
namespace detail { | |
const struct | |
{ | |
struct any | |
{ | |
template <typename T> | |
explicit operator T() const noexcept { return {}; } | |
}; | |
template <typename... T> | |
any operator()(T &&...) const noexcept { return {}; } | |
} ignore; | |
} // namespace detail | |
template <typename Acceptor, typename Socket, typename Buf> | |
void | |
proc_accept(Acceptor acc, std::shared_ptr<Socket> s | |
, Buf buf, const boost::system::error_code &err) | |
{ | |
if (err != boost::asio::error::operation_aborted) | |
{ | |
start_accept(acc, buf); | |
} | |
if (!err) | |
{ | |
std::ostringstream ostr; | |
async_write(*s, asio::buffer(*buf), std::bind(detail::ignore, s, buf)); | |
} | |
} | |
template <typename Acceptor, typename Buf> | |
void | |
start_accept(Acceptor acc, Buf buf) | |
{ | |
using asio::placeholders::error; | |
typedef typename Acceptor::element_type::protocol_type::socket socket; | |
auto s = std::make_shared<socket>(acc->get_io_service()); | |
acc->async_accept(*s, [=](const boost::system::error_code &err) noexcept | |
{ | |
proc_accept(acc, s, buf, err); | |
}); | |
} | |
typedef std::shared_ptr<asio::io_service> shared_io_service; | |
template <typename Acceptor> | |
std::vector<std::tuple<std::thread, std::shared_ptr<Acceptor>, shared_io_service>> | |
setup_workers(size_t count, std::shared_ptr<Acceptor> acc) | |
{ | |
decltype(setup_workers(count, acc)) workers; | |
workers.resize(count); | |
auto buffer = std::make_shared<std::vector<char>>(100 * std::mega::num); | |
for (auto &thaios : workers) | |
{ | |
auto ios = std::make_shared<asio::io_service>(); | |
auto a = std::make_shared<Acceptor>(dup_acceptor(*ios, *acc)); | |
std::thread([=]() noexcept | |
{ | |
start_accept(a, buffer); | |
ios->run(); | |
}).swap(std::get<0>(thaios)); | |
std::get<1>(thaios) = a; | |
std::get<2>(thaios) = ios; | |
} | |
std::cout << "[" << getpid() << "] Total running workers: " << workers.size() << std::endl; | |
return move(workers); | |
} | |
int main(int argc, char **argv) try | |
{ | |
const auto path = "test"; | |
const tcp::endpoint accpoint(tcp::v6(), 5555); | |
size_t wcount = std::thread::hardware_concurrency(); | |
if (2 <= argc) { wcount = std::stoi(argv[1]); } | |
asio::io_service ios; | |
using graceful::create_acceptor_or_recv; | |
auto acc = create_acceptor_or_recv<tcp::acceptor>(ios, path, accpoint); | |
acc->set_option(tcp::acceptor::reuse_address(true)); | |
auto workers = setup_workers(wcount, acc); | |
asio::signal_set s(ios, SIGTERM, SIGINT); | |
s.async_wait(std::bind([&]() noexcept | |
{ | |
ios.stop(); | |
for (auto &thaios : workers) | |
{ | |
std::get<2>(thaios)->stop(); | |
} | |
})); | |
ios.run(); | |
for (auto &thaios : workers) | |
{ | |
std::get<1>(thaios)->cancel(); | |
} | |
std::cout << "[" << getpid() << "] All acceptor stopped" << std::endl; | |
for (auto &thaios : workers) | |
{ | |
std::get<0>(thaios).join(); | |
} | |
std::cout << "[" << getpid() << "] Terminate" << std::endl; | |
} | |
catch (boost::exception &e) | |
{ | |
std::cout << diagnostic_information(e) << std::endl; | |
} | |
catch (std::exception &e) | |
{ | |
std::cout << "caught std::exception" << std::endl | |
<< " what: " << e.what() << std::endl; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment