Skip to content

Instantly share code, notes, and snippets.

@palacaze
Last active April 20, 2021 21:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save palacaze/8201f4848fd6208337c853c04f02f290 to your computer and use it in GitHub Desktop.
Save palacaze/8201f4848fd6208337c853c04f02f290 to your computer and use it in GitHub Desktop.
Async Tcp reader base class
#include <string_view>
#include <pal/core/cast.hpp>
#include <pal/core/print-helpers.hpp>
#include <pal/core/container/container.hpp>
#include "tcp-reader.hpp"
namespace pal::tcp {
Reader::Reader(const std::string &name, Config conf)
: m_logger{log::get(name)}
, m_conf{conf}
, m_ioc{}
, m_work{net::make_work_guard(m_ioc)}
, m_socket{m_ioc}
, m_state{State::Disconnected}
, m_deadline{m_ioc}
, m_resolver{m_ioc}
, m_server_port{0}
, m_retries{m_conf.conn_retries}
, m_timeout{m_conf.conn_timeout_min}
, m_connected{false}
, m_read_strand{m_ioc.get_executor()}
{
startThread();
}
Reader::~Reader()
{
m_resolver.cancel();
disconnect();
stopThread();
}
ExecutorType Reader::getExecutor()
{
return m_ioc.get_executor();
}
void Reader::connect(const std::string &addr, uint16 port)
{
m_server_address = addr;
m_server_port = port;
m_timeout = m_conf.conn_timeout_min;
m_retries = m_conf.conn_retries;
stop();
start();
}
void Reader::disconnect()
{
m_connected = false;
stop();
}
void Reader::start()
{
// Nothing left to do
if (m_retries <= 0) {
disconnect();
PAL_ERROR("Could not connect to {}:{}, aborting", m_server_address, m_server_port);
return;
}
// deadline for the resolution operation
startTimer();
m_state = State::Resolving;
PAL_DEBUG("Connection attempt n°{}", m_conf.conn_retries - m_retries + 1);
m_resolver.async_resolve(
tcp::v4(),
m_server_address,
std::to_string(int(m_server_port)),
[self = this](const error_code &ec, auto results) {
if (ec) {
log::warn(self->m_logger,
"Could not resolve address {}",
self->m_server_address);
// checkDeadline will restart the resolution
} else {
log::info(self->m_logger,
"Resolved address {} to {}:{}",
self->m_server_address,
results.begin()->endpoint().address().to_string(),
results.begin()->endpoint().port());
self->startConnect(std::move(results));
}
});
}
void Reader::stop()
{
const State old_state = m_state;
m_state = State::Disconnecting;
error_code ec;
m_socket.shutdown(net::socket_base::shutdown_both, ec);
m_socket.close(ec);
m_resolver.cancel();
m_deadline.cancel(ec);
if (old_state == State::Connected) {
PAL_INFO("Disconnecting from server");
onDisconnected();
}
m_state = State::Disconnected;
}
void Reader::startThread()
{
// start thread to process the asio events
try {
m_thread = std::make_unique<std::thread>([this] {
#ifdef __linux__
std::string thread_name = fmt::format("{}", m_logger->name());
thread_name.resize(15);
pthread_setname_np(pthread_self(), thread_name.c_str());
#endif
try {
m_ioc.run();
} catch (const std::exception &e) {
PAL_ERROR("Could not create io thread: {}", e.what());
}
});
} catch (...) {
PAL_ERROR("Could not create gps io thread");
}
}
void Reader::stopThread()
{
try {
m_work.reset();
m_ioc.stop();
if (m_thread && m_thread->joinable()) {
m_thread->join();
}
} catch (...) {}
m_thread.reset();
}
void Reader::startTimer()
{
m_deadline.expires_after(time::msecs(m_timeout));
m_deadline.async_wait([this] (const auto &e) { checkDeadline(e); });
}
void Reader::stopTimer()
{
error_code ec;
m_deadline.cancel(ec);
}
void Reader::updateTimeout()
{
// exponential backoff with limit.
m_timeout = pal::clamp(double(m_timeout) * m_conf.conn_backoff_rate,
m_conf.conn_timeout_min,
m_conf.conn_timeout_max);
m_retries--;
}
bool Reader::isConnected() const
{
return m_connected && m_socket.is_open();
}
log::LoggerPtr Reader::logger() const
{
return m_logger;
}
const std::string& Reader::address() const
{
return m_server_address;
}
uint16 Reader::port() const
{
return m_server_port;
}
void Reader::startConnect(const TcpResolver::results_type &endpoints)
{
// deadline for the connect operation
startTimer();
m_state = State::Connecting;
net::async_connect(m_socket,
endpoints,
[self = this] (auto ec, auto endpoint)
{
if (ec) {
log::error(self->m_logger,
"Could not connect to {}:{}, {}",
self->m_server_address,
self->m_server_port,
ec.message());
// checkDeadline will restart the resolution
return;
}
self->m_state = State::Connected;
setSocketOptions(self->m_socket);
// start reading
net::dispatch(self->m_read_strand, [self] {
self->asyncReadSome();
});
log::info(self->m_logger,
"connected to {}:{}",
endpoint.address().to_string(),
endpoint.port());
self->m_connected = true;
self->onConnected();
});
}
void Reader::checkDeadline(error_code ec)
{
// timer was cancelled
if (ec == net::error::operation_aborted) {
return;
}
// resolving or connecting failed, update the timeout and try again
if (m_state == State::Resolving || m_state == State::Connecting) {
updateTimeout();
start();
return;
}
// We are disconnected, nothing left to do
if (m_state == State::Disconnecting || m_state == State::Disconnected) {
stopTimer();
return;
}
// Connected
if (m_state == State::Connected) {
stopTimer();
}
}
void Reader::signalReadError(error_code ec, std::string msg)
{
if (!ec) {
return;
}
// wanted
if (ec == net::error::operation_aborted && m_state == State::Disconnecting) {
return;
}
onError(ec);
if (msg.empty()) {
msg = "Unknown";
}
PAL_TRACE("{}: {}", msg, ec);
}
void Reader::asyncReadSome()
{
asio::async_read(
m_socket,
net::dynamic_buffer(m_buf),
net::transfer_at_least(1),
net::bind_executor(m_read_strand,
[self = this] (auto && ec, size_t /*sz*/)
{
if (ec) {
self->signalReadError(ec, "Error while reading some data");
return;
}
self->onDataReceived(std::move(self->m_buf));
self->m_buf = {};
self->asyncReadSome();
}));
}
void Reader::onConnected() {}
void Reader::onDisconnected() {}
void Reader::onError(error_code) {}
void Reader::onDataReceived(std::vector<byte> /*data*/) {}
} // namespace pal::tcp
#pragma once
#include <atomic>
#include <limits>
#include <memory>
#include <pal/core/error.hpp>
#include <pal/core/global.hpp>
#include <pal/core/log.hpp>
#include <pal/core/tcp/tcp-defs.hpp>
#include <pal/core/type/enum.hpp>
/**
* @file tcp-client.hpp
* An asynchronous tcp client base class
*/
namespace pal::tcp {
namespace net = asio;
using tcp = net::ip::tcp;
using error_code = std::error_code;
/**
* These types are defined explicitely here to avoid using the polymorphic
* executor used by default with recent ASIO releases, and which incurs non
* negligible slowdowns.
*/
using ExecutorType = net::io_context::executor_type;
using TcpSocket = net::basic_stream_socket<tcp, ExecutorType>;
using TcpAcceptor = net::basic_socket_acceptor<tcp, ExecutorType>;
using TcpResolver = net::ip::basic_resolver<tcp, ExecutorType>;
/**
* The states a client may have.
*/
PAL_DECLARE_ENUM(State, int,
Disconnected, ///< Disconnected
Resolving, ///< Resolving the server ip
Connecting, ///< Connecting to the server
Connected, ///< Connected to the server
Disconnecting ///< Disconnecting from the server
)
/**
* Simple Asynchronous TCP Reader base class to work read data from a tcp socket.
* All operations are performed asynchronously, it keeps trying to reconnect until
* it either succeeds or the connection strategy fails.
*/
class PAL_CORE_EXPORT Reader {
public:
/**
* Client configuration settings
*/
struct Config {
Config() noexcept {}
uint32 conn_timeout_min = 1000; ///< Minimun connection timeout in ms
uint32 conn_timeout_max = 20'000; ///< Maximun connection timeout in ms
double conn_backoff_rate = 1.0; ///< Exponential backoff rate, 1.0 means constant
int32 conn_retries = INT32_MAX; ///< Number of connection retries
};
public:
/// Destructor
virtual ~Reader();
/// The associated executor
ExecutorType getExecutor();
/**
* Connect to a server asynchronously.
*
* The function returns immediately.
*/
void connect(const std::string &addr, uint16 port);
/// Disconnect from the server
void disconnect();
/// Test whether we are connected
bool isConnected() const;
/// Message Logger
log::LoggerPtr logger() const;
/// Server address
const std::string& address() const;
/// server port
uint16 port() const;
protected:
/**
* Create a new Tcp Reader.
*
* @param name The name of the Reader, for logging purpose
* @param conf The configuration of the reader
* @return a new reader
*/
explicit Reader(const std::string &name, Config conf);
/// Logger
log::LoggerPtr m_logger;
/**
* Method called once the client is connected to the server
*/
virtual void onConnected();
/**
* Method called when the client is disconnected from the server
*/
virtual void onDisconnected();
/**
* Method called when a data reception error occurs
*/
virtual void onError(error_code);
/**
* Method called when data is received
*/
virtual void onDataReceived(std::vector<byte> data);
private:
void start();
void stop();
void startThread();
void stopThread();
void startTimer();
void stopTimer();
void updateTimeout();
void startConnect(const TcpResolver::results_type &endpoints);
void checkDeadline(error_code ec);
void signalReadError(error_code ec, std::string msg);
void asyncReadSome();
void parseMessages();
void consume(size_t n);
private:
const Config m_conf;
net::io_context m_ioc;
net::executor_work_guard<ExecutorType> m_work;
TcpSocket m_socket;
std::atomic<State> m_state;
net::steady_timer m_deadline;
TcpResolver m_resolver;
std::string m_server_address;
uint16 m_server_port;
std::atomic<int32> m_retries;
std::atomic<uint32> m_timeout;
std::atomic<bool> m_connected;
std::unique_ptr<std::thread> m_thread;
std::vector<byte> m_buf;
net::strand<ExecutorType> m_read_strand;
};
} // namespace pal::tcp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment