Last active
April 20, 2021 21:53
-
-
Save palacaze/8201f4848fd6208337c853c04f02f290 to your computer and use it in GitHub Desktop.
Async Tcp reader base class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <string_view> | |
#include <pal/core/cast.hpp> | |
#include <pal/core/print-helpers.hpp> | |
#include <pal/core/container/container.hpp> | |
#include "tcp-reader.hpp" | |
namespace pal::tcp { | |
Reader::Reader(const std::string &name, Config conf) | |
: m_logger{log::get(name)} | |
, m_conf{conf} | |
, m_ioc{} | |
, m_work{net::make_work_guard(m_ioc)} | |
, m_socket{m_ioc} | |
, m_state{State::Disconnected} | |
, m_deadline{m_ioc} | |
, m_resolver{m_ioc} | |
, m_server_port{0} | |
, m_retries{m_conf.conn_retries} | |
, m_timeout{m_conf.conn_timeout_min} | |
, m_connected{false} | |
, m_read_strand{m_ioc.get_executor()} | |
{ | |
startThread(); | |
} | |
Reader::~Reader() | |
{ | |
m_resolver.cancel(); | |
disconnect(); | |
stopThread(); | |
} | |
ExecutorType Reader::getExecutor() | |
{ | |
return m_ioc.get_executor(); | |
} | |
void Reader::connect(const std::string &addr, uint16 port) | |
{ | |
m_server_address = addr; | |
m_server_port = port; | |
m_timeout = m_conf.conn_timeout_min; | |
m_retries = m_conf.conn_retries; | |
stop(); | |
start(); | |
} | |
void Reader::disconnect() | |
{ | |
m_connected = false; | |
stop(); | |
} | |
void Reader::start() | |
{ | |
// Nothing left to do | |
if (m_retries <= 0) { | |
disconnect(); | |
PAL_ERROR("Could not connect to {}:{}, aborting", m_server_address, m_server_port); | |
return; | |
} | |
// deadline for the resolution operation | |
startTimer(); | |
m_state = State::Resolving; | |
PAL_DEBUG("Connection attempt n°{}", m_conf.conn_retries - m_retries + 1); | |
m_resolver.async_resolve( | |
tcp::v4(), | |
m_server_address, | |
std::to_string(int(m_server_port)), | |
[self = this](const error_code &ec, auto results) { | |
if (ec) { | |
log::warn(self->m_logger, | |
"Could not resolve address {}", | |
self->m_server_address); | |
// checkDeadline will restart the resolution | |
} else { | |
log::info(self->m_logger, | |
"Resolved address {} to {}:{}", | |
self->m_server_address, | |
results.begin()->endpoint().address().to_string(), | |
results.begin()->endpoint().port()); | |
self->startConnect(std::move(results)); | |
} | |
}); | |
} | |
void Reader::stop() | |
{ | |
const State old_state = m_state; | |
m_state = State::Disconnecting; | |
error_code ec; | |
m_socket.shutdown(net::socket_base::shutdown_both, ec); | |
m_socket.close(ec); | |
m_resolver.cancel(); | |
m_deadline.cancel(ec); | |
if (old_state == State::Connected) { | |
PAL_INFO("Disconnecting from server"); | |
onDisconnected(); | |
} | |
m_state = State::Disconnected; | |
} | |
void Reader::startThread() | |
{ | |
// start thread to process the asio events | |
try { | |
m_thread = std::make_unique<std::thread>([this] { | |
#ifdef __linux__ | |
std::string thread_name = fmt::format("{}", m_logger->name()); | |
thread_name.resize(15); | |
pthread_setname_np(pthread_self(), thread_name.c_str()); | |
#endif | |
try { | |
m_ioc.run(); | |
} catch (const std::exception &e) { | |
PAL_ERROR("Could not create io thread: {}", e.what()); | |
} | |
}); | |
} catch (...) { | |
PAL_ERROR("Could not create gps io thread"); | |
} | |
} | |
void Reader::stopThread() | |
{ | |
try { | |
m_work.reset(); | |
m_ioc.stop(); | |
if (m_thread && m_thread->joinable()) { | |
m_thread->join(); | |
} | |
} catch (...) {} | |
m_thread.reset(); | |
} | |
void Reader::startTimer() | |
{ | |
m_deadline.expires_after(time::msecs(m_timeout)); | |
m_deadline.async_wait([this] (const auto &e) { checkDeadline(e); }); | |
} | |
void Reader::stopTimer() | |
{ | |
error_code ec; | |
m_deadline.cancel(ec); | |
} | |
void Reader::updateTimeout() | |
{ | |
// exponential backoff with limit. | |
m_timeout = pal::clamp(double(m_timeout) * m_conf.conn_backoff_rate, | |
m_conf.conn_timeout_min, | |
m_conf.conn_timeout_max); | |
m_retries--; | |
} | |
bool Reader::isConnected() const | |
{ | |
return m_connected && m_socket.is_open(); | |
} | |
log::LoggerPtr Reader::logger() const | |
{ | |
return m_logger; | |
} | |
const std::string& Reader::address() const | |
{ | |
return m_server_address; | |
} | |
uint16 Reader::port() const | |
{ | |
return m_server_port; | |
} | |
void Reader::startConnect(const TcpResolver::results_type &endpoints) | |
{ | |
// deadline for the connect operation | |
startTimer(); | |
m_state = State::Connecting; | |
net::async_connect(m_socket, | |
endpoints, | |
[self = this] (auto ec, auto endpoint) | |
{ | |
if (ec) { | |
log::error(self->m_logger, | |
"Could not connect to {}:{}, {}", | |
self->m_server_address, | |
self->m_server_port, | |
ec.message()); | |
// checkDeadline will restart the resolution | |
return; | |
} | |
self->m_state = State::Connected; | |
setSocketOptions(self->m_socket); | |
// start reading | |
net::dispatch(self->m_read_strand, [self] { | |
self->asyncReadSome(); | |
}); | |
log::info(self->m_logger, | |
"connected to {}:{}", | |
endpoint.address().to_string(), | |
endpoint.port()); | |
self->m_connected = true; | |
self->onConnected(); | |
}); | |
} | |
void Reader::checkDeadline(error_code ec) | |
{ | |
// timer was cancelled | |
if (ec == net::error::operation_aborted) { | |
return; | |
} | |
// resolving or connecting failed, update the timeout and try again | |
if (m_state == State::Resolving || m_state == State::Connecting) { | |
updateTimeout(); | |
start(); | |
return; | |
} | |
// We are disconnected, nothing left to do | |
if (m_state == State::Disconnecting || m_state == State::Disconnected) { | |
stopTimer(); | |
return; | |
} | |
// Connected | |
if (m_state == State::Connected) { | |
stopTimer(); | |
} | |
} | |
void Reader::signalReadError(error_code ec, std::string msg) | |
{ | |
if (!ec) { | |
return; | |
} | |
// wanted | |
if (ec == net::error::operation_aborted && m_state == State::Disconnecting) { | |
return; | |
} | |
onError(ec); | |
if (msg.empty()) { | |
msg = "Unknown"; | |
} | |
PAL_TRACE("{}: {}", msg, ec); | |
} | |
void Reader::asyncReadSome() | |
{ | |
asio::async_read( | |
m_socket, | |
net::dynamic_buffer(m_buf), | |
net::transfer_at_least(1), | |
net::bind_executor(m_read_strand, | |
[self = this] (auto && ec, size_t /*sz*/) | |
{ | |
if (ec) { | |
self->signalReadError(ec, "Error while reading some data"); | |
return; | |
} | |
self->onDataReceived(std::move(self->m_buf)); | |
self->m_buf = {}; | |
self->asyncReadSome(); | |
})); | |
} | |
void Reader::onConnected() {} | |
void Reader::onDisconnected() {} | |
void Reader::onError(error_code) {} | |
void Reader::onDataReceived(std::vector<byte> /*data*/) {} | |
} // namespace pal::tcp |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include <atomic> | |
#include <limits> | |
#include <memory> | |
#include <pal/core/error.hpp> | |
#include <pal/core/global.hpp> | |
#include <pal/core/log.hpp> | |
#include <pal/core/tcp/tcp-defs.hpp> | |
#include <pal/core/type/enum.hpp> | |
/** | |
* @file tcp-client.hpp | |
* An asynchronous tcp client base class | |
*/ | |
namespace pal::tcp { | |
namespace net = asio; | |
using tcp = net::ip::tcp; | |
using error_code = std::error_code; | |
/** | |
* These types are defined explicitely here to avoid using the polymorphic | |
* executor used by default with recent ASIO releases, and which incurs non | |
* negligible slowdowns. | |
*/ | |
using ExecutorType = net::io_context::executor_type; | |
using TcpSocket = net::basic_stream_socket<tcp, ExecutorType>; | |
using TcpAcceptor = net::basic_socket_acceptor<tcp, ExecutorType>; | |
using TcpResolver = net::ip::basic_resolver<tcp, ExecutorType>; | |
/** | |
* The states a client may have. | |
*/ | |
PAL_DECLARE_ENUM(State, int, | |
Disconnected, ///< Disconnected | |
Resolving, ///< Resolving the server ip | |
Connecting, ///< Connecting to the server | |
Connected, ///< Connected to the server | |
Disconnecting ///< Disconnecting from the server | |
) | |
/** | |
* Simple Asynchronous TCP Reader base class to work read data from a tcp socket. | |
* All operations are performed asynchronously, it keeps trying to reconnect until | |
* it either succeeds or the connection strategy fails. | |
*/ | |
class PAL_CORE_EXPORT Reader { | |
public: | |
/** | |
* Client configuration settings | |
*/ | |
struct Config { | |
Config() noexcept {} | |
uint32 conn_timeout_min = 1000; ///< Minimun connection timeout in ms | |
uint32 conn_timeout_max = 20'000; ///< Maximun connection timeout in ms | |
double conn_backoff_rate = 1.0; ///< Exponential backoff rate, 1.0 means constant | |
int32 conn_retries = INT32_MAX; ///< Number of connection retries | |
}; | |
public: | |
/// Destructor | |
virtual ~Reader(); | |
/// The associated executor | |
ExecutorType getExecutor(); | |
/** | |
* Connect to a server asynchronously. | |
* | |
* The function returns immediately. | |
*/ | |
void connect(const std::string &addr, uint16 port); | |
/// Disconnect from the server | |
void disconnect(); | |
/// Test whether we are connected | |
bool isConnected() const; | |
/// Message Logger | |
log::LoggerPtr logger() const; | |
/// Server address | |
const std::string& address() const; | |
/// server port | |
uint16 port() const; | |
protected: | |
/** | |
* Create a new Tcp Reader. | |
* | |
* @param name The name of the Reader, for logging purpose | |
* @param conf The configuration of the reader | |
* @return a new reader | |
*/ | |
explicit Reader(const std::string &name, Config conf); | |
/// Logger | |
log::LoggerPtr m_logger; | |
/** | |
* Method called once the client is connected to the server | |
*/ | |
virtual void onConnected(); | |
/** | |
* Method called when the client is disconnected from the server | |
*/ | |
virtual void onDisconnected(); | |
/** | |
* Method called when a data reception error occurs | |
*/ | |
virtual void onError(error_code); | |
/** | |
* Method called when data is received | |
*/ | |
virtual void onDataReceived(std::vector<byte> data); | |
private: | |
void start(); | |
void stop(); | |
void startThread(); | |
void stopThread(); | |
void startTimer(); | |
void stopTimer(); | |
void updateTimeout(); | |
void startConnect(const TcpResolver::results_type &endpoints); | |
void checkDeadline(error_code ec); | |
void signalReadError(error_code ec, std::string msg); | |
void asyncReadSome(); | |
void parseMessages(); | |
void consume(size_t n); | |
private: | |
const Config m_conf; | |
net::io_context m_ioc; | |
net::executor_work_guard<ExecutorType> m_work; | |
TcpSocket m_socket; | |
std::atomic<State> m_state; | |
net::steady_timer m_deadline; | |
TcpResolver m_resolver; | |
std::string m_server_address; | |
uint16 m_server_port; | |
std::atomic<int32> m_retries; | |
std::atomic<uint32> m_timeout; | |
std::atomic<bool> m_connected; | |
std::unique_ptr<std::thread> m_thread; | |
std::vector<byte> m_buf; | |
net::strand<ExecutorType> m_read_strand; | |
}; | |
} // namespace pal::tcp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment