Skip to content

Instantly share code, notes, and snippets.

@TheBurnDoc
Last active December 26, 2015 02:49
Show Gist options
  • Save TheBurnDoc/7081874 to your computer and use it in GitHub Desktop.
Save TheBurnDoc/7081874 to your computer and use it in GitHub Desktop.
This Gist shows an example C++ TCP client/server transport architecture using raw sockets. The server listens on a separate thread and utilises the select call, which is arguably old and out of date, but it is well supported and serves its purpose in this example. This Gist depends on a custom thread class (or the pthreads one in one of my other…
#include <cassert>
#include <cstdlib>
#include <cstring>
#include <cerrno>
#include <string>
#include <sstream>
#include <stdexcept>
#include <sys/socket.h>
#include <unistd.h>
#include <netdb.h>
#include <fcntl.h>
#include "transport.h"
#include "tcp.h"
// //
// SERVER //
// //
tcp_server::tcp_server(const std::string id, unsigned int port, payload_codec& pl) :
tcp_transport(id, port, pl), _listen_thread(this), _listen_fd(0)
{
}
void tcp_server::bring_up()
{
if (is_up()) throw std::logic_error("Already up");
addrinfo hints;
std::memset(&hints, '0', sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
addrinfo* serv_addr;
std::stringstream port;
port << get_port();
if (int ret = getaddrinfo(nullptr, port.str().c_str(), &hints, &serv_addr) == -1)
throw std::runtime_error(gai_strerror(ret));
if ((_listen_fd = socket(serv_addr->ai_family, serv_addr->ai_socktype, serv_addr->ai_protocol)) == -1)
throw std::runtime_error(strerror(errno));
if(bind(_listen_fd, serv_addr->ai_addr, serv_addr->ai_addrlen) == -1)
throw std::runtime_error(strerror(errno));
if(listen(_listen_fd, 0) == -1)
throw std::runtime_error(strerror(errno));
// Reusable
int optval = 1;
if(setsockopt(_listen_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(int)) == -1)
throw std::runtime_error(strerror(errno));
// Non-blocking
int flags = fcntl(_listen_fd, F_GETFL, 0);
if(fcntl(_listen_fd, F_SETFL, flags | O_NONBLOCK) == -1)
throw std::runtime_error(strerror(errno));
freeaddrinfo(serv_addr);
_up = true;
_listen_thread.start();
}
void tcp_server::tear_down()
{
if (!is_up()) throw std::logic_error("Already down");
_listen_thread.tear_down();
_listen_thread.join();
_up = false;
}
// //
// Listener Thread //
// //
tcp_listener::tcp_listener(tcp_server* serv) :
thread(), _server(serv), _listen(true)
{
FD_ZERO(&_all_fds);
}
void tcp_listener::thread_main()
{
assert(_server->_listen_fd != 0);
fd_set read_fds;
int i_fd, new_fd;
ssize_t nbytes, nbytes_tail;
size_t recv_len = 1024;
char* recv_buff = static_cast<char*>(std::malloc(recv_len));
// Add listener to fd_set
FD_ZERO(&read_fds);
FD_SET(_server->_listen_fd, &_all_fds);
_fd_max = _server->_listen_fd;
// Listen loop
while(_listen)
{
read_fds = _all_fds;
if ((select(_fd_max + 1, &read_fds, nullptr, nullptr, nullptr)) == -1)
{
std::cout << "select failed: " << strerror(errno);
return;
}
// Iterate through file descriptors
for (i_fd = 0; i_fd <= _fd_max; i_fd++)
{
if (FD_ISSET(i_fd, &read_fds))
{
// Listener socket
if (i_fd == _server->_listen_fd)
{
// Check for new connections
new_fd = accept(_server->_listen_fd, static_cast <sockaddr*>(NULL), NULL);
if(new_fd > 0)
{
std::cout << "Transport '" << _server->get_id()
<< "' accepted new client connection on port " << _server->get_port();
FD_SET(new_fd, &_all_fds);
if (new_fd > _fd_max)
_fd_max = new_fd;
}
else if(new_fd == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
std::cout << "accept failed: " << strerror(errno);
}
// Client sockets
else
{
// First read
if((nbytes = recv(i_fd, static_cast<void*>(recv_buff), recv_len, 0)) <= 0)
{
if(nbytes == 0)
{
// Orderly shutdown
std::cout << "Transport '" << _server->get_id()
<< "' received orderly client disconnection";
FD_CLR(i_fd, &_all_fds);
close(i_fd);
continue;
}
else if(nbytes == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
{
// Some other error
std::cout << "recv failed: " << strerror(errno);
FD_CLR(i_fd, &_all_fds);
close(i_fd);
continue;
}
}
// Get header
tcp_header* hdr = reinterpret_cast<tcp_header*>(recv_buff);
hdr->payload_len = ntohl(hdr->payload_len);
// Sanity check payload type (A for now)
if (hdr->payload_char != 'A')
std::cout << "Payload char check failed (incoming char '" << hdr->payload_char
<< "', expected char '" << 'A' << "')";
// Get rest of packet if needed
while (nbytes < static_cast<ssize_t>(hdr->payload_len))
{
recv_buff = static_cast<char*>(std::realloc(recv_buff, recv_len *= 2));
nbytes_tail += recv(i_fd, static_cast<void*>(recv_buff + nbytes), recv_len - nbytes, 0);
if (nbytes_tail <= 0) break;
nbytes += nbytes_tail;
}
// Send payload for decoding
packet pkt = _server->get_codec().decode(
&recv_buff[sizeof(tcp_header)], nbytes - sizeof(tcp_header));
// At this point we have a packet, so we would do something with it, for example
// if we had some kind of callback handler we might do the following:
//_server->get_cb().process(pkt);
}
}
}
}
// Cleanup
for(int i_fd = 0; i_fd <= _fd_max; i_fd++)
if (FD_ISSET(i_fd, &_all_fds)) close(i_fd);
FD_ZERO(&_all_fds);
}
// //
// CLIENT //
// //
tcp_client::tcp_client(const std::string& id, const std::string& host, unsigned int port,
payload_codec& pl) : tcp_transport(id, port, pl), _host(host), _conn_fd(0)
{
}
void tcp_client::bring_up()
{
if (is_up()) throw std::logic_error("Already up");
addrinfo hints;
std::memset(&hints, '0', sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
addrinfo* serv_addr;
std::stringstream port;
port << get_port();
if (int ret = getaddrinfo(_host.c_str(), port.str().c_str(), &hints, &serv_addr) == -1)
throw std::runtime_error(gai_strerror(ret));
if ((_conn_fd = socket(serv_addr->ai_family, serv_addr->ai_socktype, serv_addr->ai_protocol)) == -1)
throw std::runtime_error(strerror(errno));
if (connect(_conn_fd, serv_addr->ai_addr, serv_addr->ai_addrlen) == -1)
throw std::runtime_error(strerror(errno));
freeaddrinfo(serv_addr);
_up = true;
}
void tcp_client::tear_down()
{
if (!is_up()) throw std::logic_error("Already down");
close(_conn_fd);
_up = false;
}
void tcp_client::send(char* buff, size_t len)
{
if (!is_up()) throw std::logic_error("Transport is down, send not possible");
char* send_buff = static_cast<char*>(std::malloc(len + sizeof(tcp_header)));
std::memcpy(static_cast<void*>(&send_buff[sizeof(tcp_header)]), static_cast<void*>(buff), len);
tcp_header* hdr = reinterpret_cast<tcp_header*>(send_buff);
hdr->payload_char = 'A';
hdr->payload_len = htonl(len);
::send(_conn_fd, send_buff, len + sizeof(tcp_header), 0);
}
#pragma once
#include <string>
#include <sys/select.h>
#include "thread.h"
#include "transport.h"
// sizeof(tcp_header) == 16
struct tcp_header
{
char payload_char;
size_t payload_len;
char padding[11];
};
class tcp_transport : public transport
{
public:
explicit tcp_transport(const std::string id, unsigned int port, payload_codec& pl) :
transport(id, pl), _up(false), _port(port) {}
virtual void bring_up() = 0;
virtual void tear_down() = 0;
bool is_up() const { return _up; }
unsigned int get_port() const { return _port; }
protected:
bool _up;
private:
unsigned int _port;
};
class tcp_server;
class tcp_listener : public thread
{
public:
tcp_listener(tcp_server* serv);
~tcp_listener() { tear_down(); }
void tear_down() { _listen = false; }
private:
void thread_main();
tcp_server* _server;
bool _listen;
fd_set _all_fds;
int _fd_max;
};
class tcp_server : public tcp_transport
{
public:
explicit tcp_server(const std::string id, unsigned int port, payload_codec& pl);
~tcp_server() { tear_down(); }
void bring_up();
void tear_down();
private:
friend class tcp_listener;
tcp_listener _listen_thread;
int _listen_fd;
};
class tcp_client : public tcp_transport
{
public:
tcp_client(const std::string& id, const std::string& host, unsigned int port, payload_codec& pl);
~tcp_client() { tear_down(); }
void bring_up();
void tear_down();
void send(char* buf, size_t len);
const std::string& get_host() const { return _host; }
private:
std::string _host;
int _conn_fd;
};
#pragma once
#include "thread.h" // The thread class (in a seperate Gist)
#include "packet.h" // The packet class (not in this Gist)
class payload_codec
{
public:
virtual ~payload_codec() {}
virtual std::vector<char> encode(const packet& pkt) const = 0;
virtual packet decode(const char* buffer, size_t buffer_len) const = 0;
};
class transport
{
public:
transport(const std::string& id, payload_codec& pl) : _id(id), _codec(pl) {}
virtual void bring_up () = 0;
virtual void tear_down() = 0;
virtual bool is_up() const = 0;
const std::string& get_id() const { return _id; }
virtual ~transport() {}
protected:
const payload_codec& get_codec() const { return _codec; }
private:
std::string _id;
const payload_codec& _codec;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment