Skip to content

Instantly share code, notes, and snippets.

@Flast
Created December 8, 2012 12:36
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Flast/4240111 to your computer and use it in GitHub Desktop.
Save Flast/4240111 to your computer and use it in GitHub Desktop.
Graceful Restart with Boost.Asio
#define BOOST_RESULT_OF_USE_DECLTYPE
#include <chrono>
#include <thread>
#include <boost/mpi.hpp>
#include <iostream>
#include <sstream>
#include <utility>
#include <ratio>
#include <vector>
#include <string>
#include <iterator>
#include <boost/range/algorithm/find.hpp>
#include <stdexcept>
#include <system_error>
#include <boost/throw_exception.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::ip::tcp;
#include <sys/ioctl.h>
#include <net/if.h>
namespace detail {
unsigned long
scope_id_from_if(std::string if_)
{
static asio::io_service ios;
static tcp::socket s(ios, tcp::v6());
ifreq req;
strcpy(req.ifr_name, if_.c_str());
if (ioctl(s.native_handle(), SIOCGIFINDEX, &req) < 0)
{
perror("ioctl");
BOOST_THROW_EXCEPTION(std::system_error(errno, std::system_category()));
}
return req.ifr_ifindex;
}
} // namespace detail
asio::ip::address_v6
v6_from_string(std::string s)
{
using asio::ip::address_v6;
auto i = boost::find(s, '%');
auto addr = address_v6::from_string(std::string(s.begin(), i));
if (i != s.end())
{
const std::string if_(std::next(i), s.end());
addr.scope_id(detail::scope_id_from_if(if_));
}
return addr;
}
int main(int argc, char *argv[]) try
{
boost::mpi::environment env(argc, argv);
boost::mpi::communicator world;
if (argc != 3)
{
BOOST_THROW_EXCEPTION(
std::invalid_argument("usage: $ bin server count"));
}
const auto prefix = [&]() -> std::string
{
std::ostringstream ostr;
ostr << env.processor_name()
<< "[" << world.rank() << "/" << world.size() << "]: ";
return ostr.str();
}();
const size_t count = std::stoul(argv[2]);
//using asio::ip::address_v4;
//const tcp::endpoint server(address_v4::from_string(argv[1]), 5555);
const tcp::endpoint server(v6_from_string(argv[1]), 5555);
if (world.rank() == 0)
{
std::cout << "Total MPI process: " << world.size() << std::endl;
}
world.barrier();
std::this_thread::sleep_for(std::chrono::milliseconds(100 * world.rank()));
const size_t size = 100 * std::mega::num;
std::vector<char> buffer(std::mega::num * 10);
for (size_t i = 0; i < count; ++i)
{
asio::io_service ios;
tcp::socket s(ios);
boost::system::error_code err;
if (s.connect(server, err))
{
std::ostringstream ostr;
ostr << prefix << "connection failed (" << err << ")\n";
std::cout << ostr.str();
continue;
}
for (size_t total = 0; total < size; )
{
if (err)
{
std::ostringstream ostr;
ostr << prefix << "read_some failed (" << err << ")\n";
std::cout << ostr.str();
break;
}
total += s.read_some(asio::buffer(buffer), err);
}
}
}
catch (boost::exception &e)
{
std::cout << diagnostic_information(e) << std::endl;
}
catch (std::exception &e)
{
std::cout << "caught std::exception" << std::endl
<< " what: " << e.what() << std::endl;
}
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <functional>
#include <future>
#include <thread>
#include <type_traits>
#include <ratio>
#include <algorithm>
#include <iterator>
#include <array>
#include <vector>
#include <boost/range/algorithm/copy.hpp>
#include <string>
#include <iostream>
#include <boost/lexical_cast.hpp>
#include <boost/utility/value_init.hpp>
#include <boost/system/system_error.hpp>
#include <boost/throw_exception.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/asio.hpp>
namespace asio = boost::asio;
using asio::ip::tcp;
using asio::local::datagram_protocol;
namespace std {
template <int n>
struct is_placeholder<boost::arg<n>>
: public integral_constant<int, n> {};
template <int n>
struct is_placeholder<boost::arg<n>(*)()>
: public integral_constant<int, n> {};
} // namespace std
namespace graceful {
namespace detail {
template <typename T, typename U>
inline typename std::enable_if<std::is_pointer<T>::value, T>::type
alias_cast(U *u) noexcept
{
typedef typename std::remove_pointer<U>::type U_;
typedef typename
std::conditional<std::is_const<U_>::value, const void, void>::type
c_void;
typedef typename
std::conditional<std::is_volatile<U_>::value, volatile c_void, c_void>::type
cv_void;
return static_cast<T>(static_cast<cv_void *>(u));
}
} // namespace graceful::detail
template <typename Acceptor>
void
send_acceptor(datagram_protocol::socket &s, Acceptor &acc)
{
using detail::alias_cast;
std::cout << "[" << getpid() << "] Receive graceful signal" << std::endl;
s.connect(s.local_endpoint());
constexpr auto buflen = CMSG_SPACE(sizeof(int));
std::aligned_storage<buflen, alignof(cmsghdr)>::type cmsgbuf;
cmsghdr *cmsg = alias_cast<cmsghdr *>(std::addressof(cmsgbuf));
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
cmsg->cmsg_level = SOL_SOCKET;
cmsg->cmsg_type = SCM_RIGHTS;
*alias_cast<int *>(CMSG_DATA(cmsg)) = acc.native_handle();
boost::initialized<msghdr> msg_;
msghdr &msg = msg_;
msg.msg_control = std::addressof(cmsgbuf);
msg.msg_controllen = buflen;
if (sendmsg(s.native_handle(), &msg, 0) < 0)
{
using namespace boost::system;
BOOST_THROW_EXCEPTION(system_error(errno, generic_category()));
}
}
template <typename Acceptor>
void
recv_acceptor(datagram_protocol::socket &s, Acceptor &a
, const typename Acceptor::protocol_type &p)
{
using detail::alias_cast;
const char x[1] = {};
s.send(asio::buffer(x)); // signal
std::cout << "[" << getpid() << "] Send graceful signal" << std::endl;
constexpr auto buflen = CMSG_SPACE(sizeof(int));
std::aligned_storage<buflen, alignof(cmsghdr)>::type cmsgbuf;
boost::initialized<msghdr> msg_;
msghdr &msg = msg_;
msg.msg_control = std::addressof(cmsgbuf);
msg.msg_controllen = buflen;
msg.msg_flags = MSG_WAITALL;
if (recvmsg(s.native_handle(), &msg, 0) < 0)
{
using namespace boost::system;
BOOST_THROW_EXCEPTION(system_error(errno, generic_category()));
}
const cmsghdr *cmsg = alias_cast<const cmsghdr *>(std::addressof(cmsgbuf));
const int fd = *alias_cast<const int *>(CMSG_DATA(cmsg));
a.assign(p, fd);
}
template <typename Acceptor>
std::shared_ptr<Acceptor>
create_acceptor_or_recv(asio::io_service &ios, std::string path
, const typename Acceptor::endpoint_type &ep)
{
typedef datagram_protocol::socket local_socket;
std::shared_ptr<Acceptor> acc = nullptr;
auto l = std::make_shared<local_socket>(ios, datagram_protocol());
datagram_protocol::endpoint dep(path);
try
{
l->bind(dep);
acc = std::make_shared<Acceptor>(ios, ep);
}
catch (boost::system::system_error &)
{
l->connect(dep);
unlink(path.c_str());
l->bind(dep);
acc = std::make_shared<Acceptor>(ios);
recv_acceptor(*l, *acc, ep.protocol());
}
l->async_receive(asio::null_buffers(), std::bind([=, &ios]() noexcept
{
send_acceptor(*l, *acc);
ios.stop();
}));
return move(acc);
}
} // namespace graceful
template <typename Acceptor>
Acceptor
dup_acceptor(asio::io_service &ios, Acceptor &acc)
{
const int fd = dup(acc.native_handle());
if (fd < 0)
{
using namespace boost::system;
BOOST_THROW_EXCEPTION(system_error(errno, generic_category()));
}
return Acceptor(ios, acc.local_endpoint().protocol(), fd);
}
template <typename Acceptor, typename Buf>
void
start_accept(Acceptor, Buf);
namespace detail {
const struct
{
struct any
{
template <typename T>
explicit operator T() const noexcept { return {}; }
};
template <typename... T>
any operator()(T &&...) const noexcept { return {}; }
} ignore;
} // namespace detail
template <typename Acceptor, typename Socket, typename Buf>
void
proc_accept(Acceptor acc, std::shared_ptr<Socket> s
, Buf buf, const boost::system::error_code &err)
{
if (err != boost::asio::error::operation_aborted)
{
start_accept(acc, buf);
}
if (!err)
{
std::ostringstream ostr;
async_write(*s, asio::buffer(*buf), std::bind(detail::ignore, s, buf));
}
}
template <typename Acceptor, typename Buf>
void
start_accept(Acceptor acc, Buf buf)
{
using asio::placeholders::error;
typedef typename Acceptor::element_type::protocol_type::socket socket;
auto s = std::make_shared<socket>(acc->get_io_service());
acc->async_accept(*s, [=](const boost::system::error_code &err) noexcept
{
proc_accept(acc, s, buf, err);
});
}
typedef std::shared_ptr<asio::io_service> shared_io_service;
template <typename Acceptor>
std::vector<std::tuple<std::thread, std::shared_ptr<Acceptor>, shared_io_service>>
setup_workers(size_t count, std::shared_ptr<Acceptor> acc)
{
decltype(setup_workers(count, acc)) workers;
workers.resize(count);
auto buffer = std::make_shared<std::vector<char>>(100 * std::mega::num);
for (auto &thaios : workers)
{
auto ios = std::make_shared<asio::io_service>();
auto a = std::make_shared<Acceptor>(dup_acceptor(*ios, *acc));
std::thread([=]() noexcept
{
start_accept(a, buffer);
ios->run();
}).swap(std::get<0>(thaios));
std::get<1>(thaios) = a;
std::get<2>(thaios) = ios;
}
std::cout << "[" << getpid() << "] Total running workers: " << workers.size() << std::endl;
return move(workers);
}
int main(int argc, char **argv) try
{
const auto path = "test";
const tcp::endpoint accpoint(tcp::v6(), 5555);
size_t wcount = std::thread::hardware_concurrency();
if (2 <= argc) { wcount = std::stoi(argv[1]); }
asio::io_service ios;
using graceful::create_acceptor_or_recv;
auto acc = create_acceptor_or_recv<tcp::acceptor>(ios, path, accpoint);
acc->set_option(tcp::acceptor::reuse_address(true));
auto workers = setup_workers(wcount, acc);
asio::signal_set s(ios, SIGTERM, SIGINT);
s.async_wait(std::bind([&]() noexcept
{
ios.stop();
for (auto &thaios : workers)
{
std::get<2>(thaios)->stop();
}
}));
ios.run();
for (auto &thaios : workers)
{
std::get<1>(thaios)->cancel();
}
std::cout << "[" << getpid() << "] All acceptor stopped" << std::endl;
for (auto &thaios : workers)
{
std::get<0>(thaios).join();
}
std::cout << "[" << getpid() << "] Terminate" << std::endl;
}
catch (boost::exception &e)
{
std::cout << diagnostic_information(e) << std::endl;
}
catch (std::exception &e)
{
std::cout << "caught std::exception" << std::endl
<< " what: " << e.what() << std::endl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment