Skip to content

Instantly share code, notes, and snippets.

@jammerxd
Last active September 17, 2021 21:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jammerxd/944692e52ffae328c5d791f9fc879477 to your computer and use it in GitHub Desktop.
Save jammerxd/944692e52ffae328c5d791f9fc879477 to your computer and use it in GitHub Desktop.
Boost Asio Client Server
// Client-Test.cpp : This file contains the 'main' function. Program execution begins and ends there.
//
#include "prerequisites.h"
enum class MessageTypes : uint32_t
{
ServerAccept,
ServerDeny,
ServerPing,
MessageAll,
SendText,
ServerMessage,
ServerMessage1,
ServerMessage2,
ServerMessage3,
ServerMessage4,
ServerMessage5,
ServerMessage6,
ServerMessage7,
ServerMessage8,
ServerMessage9,
};
std::string ReadLine()
{
std::string str = "";
std::getline(std::cin, str);
return str;
}
/*std::condition_variable cv;
std::mutex mutex;
std::mutex exitMutex;
std::condition_variable ecv;
int wait_time, num_msgs, msg_size, delay_between_msgs;
std::string ReadLine()
{
std::string str = "";
std::getline(std::cin, str);
return str;
}
class MyClient : public Client<MessageTypes>
{
public:
MyClient() : Client<MessageTypes>()
{
isExiting = false;
isFirst = true;
}
virtual void OnDisconnect(boost::shared_ptr<Connection<MessageTypes>> client) {
std::stringstream ss;
ss << "Disconnected.";
std::cout << ss.str() << std::endl;
isExiting = true;
cv.notify_all();
}
virtual void OnMessage(OwnedMessage<MessageTypes>& message) {
if (message.msg.message_header.id == MessageTypes::ServerAccept)
{
int id;
message.msg >> id;
this->SetId(id);
std::stringstream ss;
ss << "[ SERVER ] SENT ID: " << id;
std::cout << ss.str() << std::endl;
ss.str("");
t1 = boost::thread(&MyClient::SendMessages, this);
t1.detach();
//cv.notify_all();
}
else if (message.msg.message_header.id == MessageTypes::SendText)
{
std::stringstream ss;
ss << "[ SERVER ] SENT MESSAGE " << message.msg.size() << " bytes long.";
std::cout << ss.str() << std::endl;
ss.str("");
}
else if (message.msg.message_header.id == MessageTypes::MessageAll)
{
std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now();
time_taken = std::chrono::duration_cast<std::chrono::milliseconds>(now - time_point).count();
std::string s = message.msg.GetString(0);
//std::stringstream ss;
//std::string ms = s.substr(s.find_last_of(' '));
//ss << "[ SERVER ] BROADCASTED MESSAGE " << ms;
//std::cout << ss.str() << std::endl;
//ss.str("");
//s = "";
if (isFirst)
{
isFirst = false;
}
else
{
if (time_taken > 55)
{
std::cout << "MESSAGE WAS DELAYED (" << std::to_string(time_taken) << "ms)" << std::endl;
}
}
time_point = std::chrono::high_resolution_clock::now();
}
}
virtual void OnConnect()
{
std::cout << "[ DEBUG ] Thread Id: " << std::this_thread::get_id() << std::endl;
}
virtual void OnMessageSent(Message<MessageTypes>& msg)
{
std::cout << "Message sent" << std::endl;
cv.notify_all();
}
std::chrono::high_resolution_clock::time_point time_point;
long long time_taken=0;
bool isFirst;
boost::thread t1;
std::condition_variable cv;
std::mutex mutex;
bool isExiting;
~MyClient() {
isExiting = true;
Disconnect();
t1.interrupt();
if (t1.joinable())
{
std::cout << "waiting for join" << std::endl;
t1.join();
}
}
private:
void SendMessages()
{
srand(time(NULL));
num_msgs = rand() % 10 + 1;
wait_time = rand() % 30 + 1;
num_msgs = 0;
char letters[10] = { 'a','b','c','d','e','f','g','h','i','j' };
for (int i = 0; i < num_msgs; i++)
{
if (this->IsConnected())
{
msg_size = rand() % 512000 + 409600;
delay_between_msgs = rand() % 10 + 1;
OwnedMessage<MessageTypes> msg;
msg.msg.message_header.id = MessageTypes::SendText;
std::string data = std::string(msg_size, letters[i]);
msg.msg.Append(data);
std::cout << "Sending " << i + 1 << " of " << num_msgs << " | Thread Id: " << std::this_thread::get_id() << std::endl;
boost::thread thread2(&MyClient::Send, this, msg.msg);
thread2.detach();
std::cout << "Sent " << i + 1 << " of " << num_msgs << " | Using thread Id: " << thread2.get_id() << std::endl;
data = "";
std::cout << "Sleeping for " << delay_between_msgs * 1000 << std::endl;
}
else
{
std::cout << "Not connected. Exiting thread." << std::endl;
cv.notify_all();
ecv.notify_all();
return;
}
if (this->IsConnected())
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(delay_between_msgs * 1000));
}
else
{
std::cout << "Not connected. Exiting thread." << std::endl;
cv.notify_all();
ecv.notify_all();
return;
}
}
ecv.notify_all();
}
};
int main()
{
wait_time = 0;
num_msgs = 0;
msg_size = 0;
std::string host = "localhost";
uint16_t port = 40000;
MyClient c;
//ReadLine();
std::cout << "Connecting..." << std::endl;
c.Connect(host, port);
//std::unique_lock<std::mutex> lock(mutex);
//cv.wait(lock);
//Sleep(wait_time * 1000);
ReadLine();
//std::unique_lock<std::mutex> lock(exitMutex);
//ecv.wait(lock);
c.Disconnect();
return 0;
}
std::vector<MyClient*> clients;
int main()
{
std::cout << "Press enter to begin..." << std::endl;
ReadLine();
std::string host = "localhost";
uint16_t port = 40000;
for (int i = 0; i < 50; i++)
{
MyClient* c =new MyClient();
c->Connect(host, port);
clients.push_back(c);
}
ReadLine();
std::for_each(
std::execution::par_unseq,
clients.begin(),
clients.end(),
[](auto&& item)
{
item->Disconnect();
delete item;
});
std::cout << "DONE" << std::endl;
return 0;
}
*/
std::atomic_bool stop = false;
std::chrono::high_resolution_clock::duration previous_time;
boost::asio::thread_pool ctx;
int messageCount;
boost::asio::high_resolution_timer timer(ctx);
class MyClient : public Client<MessageTypes,boost::asio::any_io_executor>
{
public:
MyClient(boost::asio::any_io_executor executor) : Client<MessageTypes, boost::asio::any_io_executor>(executor)
{
}
void OnDisconnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
}
void OnMessage(MsgPtr<MessageTypes> const& message, ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
if (message->message_header.id == MessageTypes::ServerAccept)
{
int id = message->GetInt(0);
SetId(id);
std::cout << "Received ID " << std::to_string(id);
}
}
void OnConnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
}
void OnMessageSent(MsgPtr<MessageTypes> const& message, ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
}
~MyClient() {
Disconnect();
}
};
int main()
{
MyClient* c;
c = new MyClient(ctx.get_executor());
c->Connect("127.0.0.1", 40000);
ReadLine();
c->Disconnect();
delete c;
return 0;
}
#pragma once
#include "prerequisites.h"
template <typename MsgId,typename Executor>
class Client
{
protected:
virtual void OnDisconnect(ConnPtr<MsgId, Executor> const& client) {
}
virtual void OnMessage(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client) {
}
virtual void OnConnect(ConnPtr<MsgId, Executor> const& client)
{
}
virtual void OnMessageSent(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client )
{
}
public:
bool Connect(const std::string& host, const uint16_t port)
{
if (IsConnected())
{
Disconnect();
}
try
{
// Resolve hostname/ip-address into tangiable physical address
boost::asio::ip::tcp::resolver resolver(executor_);
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(host, std::to_string(port));
// Create connection
connection = conn_t<MsgId, Executor>::create(
executor_, 0,
boost::bind(&Client<MsgId, Executor>::client_message, this, boost::placeholders::_1, boost::placeholders::_2),
boost::bind(&Client<MsgId, Executor>::message_sent, this, boost::placeholders::_1, boost::placeholders::_2),
boost::bind(&Client<MsgId, Executor>::client_disconnected, this, boost::placeholders::_1)
);
// Tell the connection object to connect to server
//this->connection->ConnectToServer(endpoints);
boost::asio::async_connect(connection->socket(), endpoints,
[this](std::error_code ec, boost::asio::ip::tcp::endpoint endpoint)
{
if (!ec)
{
connection->accepted();
OnConnect(connection);
}
});
}
catch (std::exception& e)
{
std::cerr << "Client Exception: " << e.what() << std::endl;
return false;
}
return true;
}
void Disconnect()
{
if (IsConnected())
{
connection->Disconnect(true,true,true);
}
}
~Client()
{
Disconnect();
}
Client(Executor executor):executor_(executor){
}
bool IsConnected()
{
return !connection->IsInvalid();
}
void Send(MsgPtr<MsgId> msg)
{
if (IsConnected())
{
connection->Send(std::move(msg));
}
}
void client_disconnected(ConnPtr<MsgId, Executor> const& client)
{
OnDisconnect(client);
}
void client_message(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client)
{
OnMessage(message,client);
}
void message_sent(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client)
{
OnMessageSent(message,client);
}
void SetId(int id)
{
connection->SetId(id);
}
private:
ConnPtr<MsgId, Executor> connection;
Executor executor_;
strand_t<Executor> strand_ = boost::asio::make_strand(executor_);
};
#pragma once
#include "prerequisites.h"
template <typename MsgId,typename Executor>
class Connection : public std::enable_shared_from_this<Connection<MsgId,Executor>>
{
public:
static ConnPtr<MsgId, Executor> create(
Executor executor,int id,
ClientMessageCallbackType<MsgId,Executor> message_handle,
ClientMessageSentCallbackType<MsgId,Executor> messagesent_handle,
ClientDisconnectCallbackType<MsgId,Executor> disconnect_handle) {
return ConnPtr<MsgId, Executor>(
new conn_t<MsgId,Executor>(executor,id,message_handle,messagesent_handle,disconnect_handle));
}
boost::asio::ip::tcp::socket& socket() {
return socket_;
}
void accepted() {
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Connected.";
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
boost::asio::ip::tcp::no_delay option(true);
socket_.set_option(option);
ReadHeader();
}
void SetId(int id)
{
boost::asio::post(socket_.get_executor(), [this,id] {
connectionId = id;
});
}
int GetId() {
return connectionId;
}
void Disconnect(bool cancel,bool shutdown, bool close) {
if (socket_.is_open() && !alreadyDisconnected.exchange(true))
{
this->alreadyDisconnected = true;
boost::system::error_code ec;
try
{
if (cancel)
socket_.cancel();
}
catch (const std::exception& ex)
{
std::cout << "[ Client " << GetId() << " ] Cancel Exception: " << ex.what() << std::endl;
}
qMessagesOut.clear();
try
{
if (shutdown)
socket_.shutdown(boost::asio::socket_base::shutdown_both, ec);
if (ec)
{
std::cout << " [ Client " <<GetId() << " ] Shutdown Error: " << ec.value() << " - " << ec.message();
}
}
catch (const std::exception& ex)
{
std::cout << "[ Client " << GetId() << " ] Shutdown Exception: " << ex.what() << std::endl;
}
try
{
if (close && !ec)
socket_.close();
}
catch (const std::exception& ex)
{
std::cout << "[ Client " << GetId() << " ] Close Exception: " << ex.what() << std::endl;
}
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Disconnected.";
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
if (disconnect_handler != nullptr)
{
disconnect_handler(this->shared_from_this());
}
}
invalidState = true;
}
bool IsInvalid() const {
return !socket_.is_open() || invalidState;
}
void Send(MsgPtr<MsgId> msg)
{
qMessagesOut.push_back(std::move(msg));
if (qMessagesOut.count() == 1)
{
// TODO FIXME Race condition?
WriteMessage();
}
}
~Connection() {
Disconnect(true,true,true);
}
Connection(Executor executor, int id,
ClientMessageCallbackType<MsgId, Executor> message_handle,
ClientMessageSentCallbackType<MsgId, Executor> messagesent_handle,
ClientDisconnectCallbackType<MsgId, Executor> disconnect_handle) :
connectionId(id),
disconnect_handler(disconnect_handle),
message_handler(message_handle),
messagesent_handler(messagesent_handle),
socket_(executor)
{
}
private:
ClientDisconnectCallbackType<MsgId, Executor> disconnect_handler;
ClientMessageCallbackType<MsgId, Executor> message_handler;
ClientMessageSentCallbackType<MsgId, Executor> messagesent_handler;
bool Report([[maybe_unused]] std::string_view caption, std::error_code ec
//,[[maybe_unused]] auto&&... what
)
{
if (ec) {
#ifdef VERBOSE_SERVER_DEBUG
std::cout << "[ Client " << GetId() << " ] " << caption
<< " Failed"
<< ec.value() << " - " << ec.message()
<< std::endl;
#endif
invalidState = true;
Disconnect(true, true, true);
return false;
}
//print_arg(what)
#ifdef VERBOSE_SERVER_DEBUG
else
{
[[maybe_unused]] auto print_arg = [](auto&& v) {
std::cout << " " << v;
return std::cout.good();
};
std::cout << "[ Client " << GetId() << " ] " << caption << " Success";
std::cout << " " << ec.value() << " - " << ec.message();
if ((true && ... && print_arg(what)))
std::cout << std::endl;
}
#endif
return true;
}
void ReadHeader() {
auto self = this->shared_from_this();
boost::asio::async_read(socket_, boost::asio::buffer(&tempInMsg.message_header, sizeof(tempInMsg.message_header)),
[this, self = this->shared_from_this()](std::error_code ec, std::size_t length)
{
if (Report("Read Header", ec)) {
tempInMsg.body.resize(tempInMsg.message_header.size);
ReadBody();
}
});
}
void ReadBody() {
boost::asio::async_read(socket_, boost::asio::buffer(tempInMsg.body),
[this, self= this->shared_from_this()](std::error_code ec, std::size_t length)
{
//if (Report("Read Body", ec, tempInMsg.body.data())) {
if (Report("Read Body", ec)) {
CommitIncoming();
// Go Back to waiting for header
ReadHeader();
}
});
}
void WriteMessage()
{
if (!qMessagesOut.empty())
{
auto message = qMessagesOut.pop_front();
std::vector buffers
{
boost::asio::buffer(&message->message_header, sizeof(MessageHeader<MsgId>)) ,
boost::asio::buffer(message->body)
};
boost::asio::async_write(socket_, buffers,
[this, self= this->shared_from_this(),message](std::error_code ec, std::size_t length) mutable
{
//if (Report("WriteMessage", ec, "Wrote", length, "bytes")) {
if(Report("WriteMeessage",ec))
{
if (messagesent_handler)
{
messagesent_handler(std::move(message), this->shared_from_this());
}
if (!qMessagesOut.empty()) {
// TODO FIXME Race condition?
WriteMessage();
}
}
});
}
}
void CommitIncoming()
{
if (message_handler)
{
message_handler(std::make_shared<Message<MsgId>>(std::move(tempInMsg)), this->shared_from_this());
}
tempInMsg.body.clear();
tempInMsg.message_header.size = 0;
}
socket_t<Executor> socket_;
int connectionId;
Message<MsgId> tempInMsg;
Message<MsgId> tempOutMsg;
ThreadSafeQueue<MsgPtr<MsgId>> qMessagesOut;
std::atomic_bool invalidState = false;
std::atomic_bool alreadyDisconnected = false;
};
#pragma once
#include "prerequisites.h"
template <typename MsgID>
struct Message
{
[[nodiscard]] size_t size() const { return body.size(); }
MessageHeader<MsgID> message_header{};
std::vector<unsigned char> body;
std::string TransactionId;
void Append(const char* data)
{
size_t i = body.size();
size_t data_length = strlen(data);
body.resize(body.size() + sizeof(size_t) + data_length);
std::memcpy(body.data() + i, &data_length, sizeof(size_t));
std::memcpy(body.data() + sizeof(size_t) + i, data, data_length);
message_header.size = (uint32_t)size();
}
void Append(std::string const& data)
{
const char* data_cStr = data.c_str();
Append(data_cStr);
}
void GetString(size_t offset, std::string const& dst) const
{
if (body.size() >= offset + sizeof(size_t))
{
size_t length;
std::memcpy(&length, body.data() + offset, sizeof(size_t));
if (body.size() >= offset + sizeof(size_t) + length)
{
dst.assign(length, body[sizeof(size_t) + offset]);
return;
}
}
std::string error = "ERROR";
dst.assign(error);
}
std::string GetString(size_t offset) const
{
if (body.size() >= offset + sizeof(size_t))
{
size_t length;
std::memcpy(&length, body.data() + offset, sizeof(size_t));
if (body.size() >= offset + sizeof(size_t) + length)
{
std::string s = std::string(body.begin() + offset + sizeof(size_t), body.begin() + offset + sizeof(size_t) + length);
return s;
}
}
std::string error = "ERROR";
return error;
}
int GetInt(size_t offset) const
{
int myValue;
std::memcpy(body.data() + offset, myValue, sizeof(int));
return myValue;
}
// Override for std::cout compatibility - produces friendly description of message
friend std::ostream& operator << (std::ostream& os, const Message<MsgID>& msg)
{
os << "ID:" << int(msg.message_header.id) << " Size:" << msg.message_header.size;
return os;
}
// Convenience Operator overloads - These allow us to add and remove stuff from
// the body vector as if it were a stack, so First in, Last Out. These are a
// template in itself, because we dont know what data type the user is pushing or
// popping, so lets allow them all. NOTE: It assumes the data type is fundamentally
// Plain Old Data (POD). TLDR: Serialise & Deserialise into/from a vector
// Pushes any POD-like data into the message buffer
template<typename DataType>
friend Message<MsgID>& operator << (Message<MsgID>& msg, const DataType& data)
{
// Check that the type of the data being pushed is trivially copyable
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pushed into vector");
// Cache current size of vector, as this will be the point we insert the data
size_t i = msg.body.size();
// Resize the vector by the size of the data being pushed
msg.body.resize(msg.body.size() + sizeof(DataType));
// Physically copy the data into the newly allocated vector space
std::memcpy(msg.body.data() + i, &data, sizeof(DataType));
// Recalculate the message size
msg.message_header.size = (uint32_t)msg.size();
// Return the target message so it can be "chained"
return msg;
}
// Pulls any POD-like data form the message buffer
template<typename DataType>
friend Message<MsgID>& operator >> (Message<MsgID>& msg, DataType& data)
{
// Check that the type of the data being pushed is trivially copyable
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pulled from vector");
// Cache the location towards the end of the vector where the pulled data starts
size_t i = msg.body.size() - sizeof(DataType);
// Physically copy the data from the vector into the user variable
std::memcpy(&data, msg.body.data() + i, sizeof(DataType));
// Shrink the vector to remove read bytes, and reset end position
msg.body.resize(i);
// Recalculate the message size
msg.message_header.size = msg.size();
// Return the target message so it can be "chained"
return msg;
}
};
#pragma once
#include "prerequisites.h"
template<typename MsgId>
struct MessageHeader
{
MsgId id{};
uint32_t size = 0;
};
#pragma once
// Windows stuff.
#define _CRT_SECURE_NO_WARNINGS
#define NOMINMAX
//boost asio
#include <boost/asio.hpp>
#include <ctime>
#include <cstring>
#include <chrono>
#include <time.h>
#include <sstream>
#include <string>
#include <vector>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdarg.h>
#include <iostream>
#include <thread>
#include <future>
#include <algorithm>
#include <cstdlib>
#include <list>
#include <set>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <execution>
#ifdef _WIN32
#include <ShlObj.h>
#include <Shlwapi.h>
#endif
#include <iostream>
#include <ostream>
#include <sstream>
#include <errno.h>
#include <stdint.h>
#include <fstream>
#include <stdexcept>
#include <math.h>
#include <random>
#include <climits>
#include <memory>
#include <utility>
#include <queue>
#include <optional>
#include <boost/bind/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include "boost/date_time/posix_time/posix_time_types.hpp"
#include <boost/thread/mutex.hpp>
#include <boost/functional.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
template<typename MsgId, typename Executor>
class Connection;
template<typename MsgId>
struct Message;
template<typename MsgId, typename Executor>
class Server;
template <typename MsgId, typename Executor>
using conn_t = Connection<MsgId, Executor>;
template <typename MsgId, typename Executor>
using ConnPtr = std::shared_ptr<conn_t<MsgId, Executor>>;
template <typename MsgId, typename Executor>
using WeakConnPtr = std::weak_ptr<conn_t<MsgId, Executor>>;
template <typename MsgId>
using MsgPtr = std::shared_ptr<Message<MsgId> const>;
template <typename MsgId, typename Executor>
using base_type = Server<MsgId, Executor>;
template <typename Executor>
using acceptor_t = boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, Executor>;
template <typename Executor>
using socket_t = boost::asio::basic_stream_socket<boost::asio::ip::tcp, Executor>;
template <typename Executor>
using strand_t = boost::asio::strand<Executor>;
template<typename MsgId, typename Executor>
using ClientDisconnectCallbackType = std::function<void(ConnPtr<MsgId, Executor> const&)>;
template<typename MsgId, typename Executor>
using ClientMessageCallbackType = std::function<void(MsgPtr <MsgId> const&, ConnPtr<MsgId, Executor> const&)>;
template<typename MsgId, typename Executor>
using ClientMessageSentCallbackType = std::function<void(MsgPtr <MsgId> const&, ConnPtr<MsgId, Executor> const&)>;
using namespace std::chrono_literals;
#include "MessageHeader.h"
#include "Message.h"
#include "ThreadSafeQueue.h"
#include "Connection.h"
#include "Server.h"
#include "Client.h"
// Server-Test.cpp : This file contains the 'main' function. Program execution begins and ends there.
//
#include "prerequisites.h"
enum class MessageTypes : uint32_t
{
ServerAccept,
ServerDeny,
ServerPing,
MessageAll,
SendText,
ServerMessage,
ServerMessage1,
ServerMessage2,
ServerMessage3,
ServerMessage4,
ServerMessage5,
ServerMessage6,
ServerMessage7,
ServerMessage8,
ServerMessage9,
};
class MyServer : public Server<MessageTypes,boost::asio::any_io_executor>
{
public:
MyServer(boost::asio::any_io_executor ex, boost::asio::ip::tcp::endpoint ep) : base_type<MessageTypes, boost::asio::any_io_executor>(ex,ep)
{
}
void OnClientDisconnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
std::cout << "[ Client " << client->GetId() << " ] Disconnected" << std::endl;
}
bool OnClientConnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
std::cout << "[ Client " << client->GetId() << " ] Connected" << std::endl;
auto msg = std::make_shared<Message<MessageTypes>>();
msg->message_header.id = MessageTypes::ServerAccept;
(*msg) << client->GetId();
client->Send(std::move(msg));
return true;
}
void OnMessage(MsgPtr<MessageTypes> const& msg, ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
std::cout << "[ Client " << client->GetId() << " ] ";
if (msg->message_header.id == MessageTypes::SendText)
{
std::string message = msg->GetString(0);
std::cout << " Received Message" << std::endl;
message = "";
client->Send(msg);//fire it back to the client
}
}
void OnMessageSent(MsgPtr<MessageTypes> const& msg, [[maybe_unused]] ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override
{
//std::cout << "[ Client " << msg->remote->GetId() << " ] ";
//std::cout << " Sent Message" << std::endl;
}
};
MyServer* srv;
std::atomic_bool stop = false;
std::chrono::high_resolution_clock::duration previous_time;
boost::asio::thread_pool ctx;
int messageCount;
boost::asio::high_resolution_timer timer(ctx);
void timedBcast(boost::system::error_code e)
{
std::chrono::high_resolution_clock::time_point tStart = std::chrono::high_resolution_clock::now();
std::chrono::high_resolution_clock::time_point tPrepared = tStart;
if (!e && !stop)
{
if (srv != nullptr)
{
if (messageCount >= 1000)
{
messageCount = 0;
}
auto msg = std::make_shared<Message<MessageTypes>>();
msg->message_header.id = MessageTypes::MessageAll;
msg->Append(std::string(rand() % 102400 + 81920, 'a') + " " + std::to_string(messageCount++));
msg->TransactionId = "Broadcast";
tPrepared = std::chrono::high_resolution_clock::now();
srv->BroadcastMessage(std::move(msg));
auto const tDone = std::chrono::high_resolution_clock::now();
auto const time = tDone - tPrepared;
auto const time2 = tDone - tStart;
if (time != previous_time && time > 2us) {
// timer += time - 6;
std::cout << "Broadcast took " << time / 1.0us << "us | "
<< time2 / 1us << "us" << std::endl;
previous_time = time;
}
auto time_expire = 30ms - time2;
if (time_expire <= 5ms) {
time_expire = 30ms;
}
// Reschedule the timer
timer.expires_from_now(time_expire);
timer.async_wait(timedBcast);
//std::cout << "BROADCAST" << std::endl;
}
}
}
int main()
{
messageCount = 1;
srand(time(NULL));
boost::asio::ip::tcp::endpoint ep{ {}, 40000 };
//boost::asio::thread_pool ctx(32);
srv = new MyServer(ctx.get_executor(), ep);
std::cout << "Hello World!\n";
timer.expires_from_now(std::chrono::milliseconds(5000));
timer.async_wait(timedBcast);
std::string str;
std::getline(std::cin, str);
srv->interrupt();
stop = true;
timer.cancel();
ctx.stop();
ctx.join();
str="";
std::getline(std::cin, str);
delete srv;
return 0;
}
#pragma once
#include "prerequisites.h"
template <typename MsgId, typename Executor>
class Server
{
public:
Server(Executor executor, boost::asio::ip::tcp::endpoint endpoint) : executor_(executor)
{
acceptor_.open(endpoint.protocol());
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.set_option(boost::asio::ip::tcp::acceptor::do_not_route(true));
acceptor_.set_option(boost::asio::ip::tcp::acceptor::keep_alive(false));
acceptor_.set_option(boost::asio::ip::tcp::acceptor::enable_connection_aborted(false));
acceptor_.set_option(boost::asio::ip::tcp::acceptor::linger(false, 3));
acceptor_.bind(endpoint);
acceptor_.listen();
start_accept();
}
void interrupt() {
shutdownBegan = true;
boost::asio::post(strand_, [this] {
acceptor_.cancel();
acceptor_.close();
for (const auto& [key, value] : connections) {
if (auto conn = value.lock())
{
conn->Disconnect(true, true, true);
}
}
connections.clear();
shutdownCompleted = true;
});
}
void BroadcastMessage(MsgPtr<MsgId> msg)
{
boost::asio::post(strand_, [this, msg = std::move(msg)]{
for (const auto& kvp : connections)
{
boost::asio::post(executor_, [this, handle = kvp.second, msg]{
if (auto conn = handle.lock())
{
if (!conn->IsInvalid())
{
conn->Send(std::move(msg));
}
}
});
}
});
}
virtual ~Server() = default;
bool IsShutDownCompleted() {
return shutdownCompleted;
}
protected:
// This server class should override thse functions to implement
// customised functionality
// Called when a client connects, you can veto the connection by returning false
virtual bool OnClientConnect(ConnPtr<MsgId,Executor> const& client)
{
return false;
}
// Called when a client appears to have disconnected
virtual void OnClientDisconnect(ConnPtr<MsgId, Executor> const& client)
{
}
// Called when a message arrives
virtual void OnMessage(MsgPtr<MsgId> const& message,ConnPtr<MsgId, Executor> const& client)
{
}
virtual void OnMessageSent(MsgPtr<MsgId> const& message,ConnPtr<MsgId, Executor> const& client)
{
}
private:
void client_disconnected(ConnPtr<MsgId, Executor> const& client)
{
OnClientDisconnect(client);
removeConnection(client);
}
void client_message(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client)
{
OnMessage(message,client);
}
void message_sent(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client)
{
OnMessageSent(message,client);
}
void start_accept() {
if (!shutdownBegan && this->acceptor_.is_open())
{
auto new_connection = conn_t<MsgId, Executor>::create(
boost::asio::make_strand(executor_), connectionIds++,
boost::bind(&Server<MsgId, Executor>::client_message,this, boost::placeholders::_1, boost::placeholders::_2),
boost::bind(&Server<MsgId, Executor>::message_sent, this, boost::placeholders::_1, boost::placeholders::_2),
boost::bind(&Server<MsgId, Executor>::client_disconnected, this, boost::placeholders::_1)
);
acceptor_.async_accept(new_connection->socket(), boost::bind(&Server<MsgId, Executor>::handle_accept, this, new_connection, boost::asio::placeholders::error));
}
}
#pragma endregion
void handle_accept(ConnPtr<MsgId, Executor> new_connection, boost::system::error_code ec)
{
if (!ec && !shutdownBegan)
{
start_accept();
if (OnClientConnect(new_connection))
{
new_connection->accepted();
addConnection(std::move(new_connection));
}
}
else if (!shutdownBegan)
{
start_accept();
}
}
#pragma region Add Connection
void addConnection(ConnPtr<MsgId, Executor> connection)
{
if (!shutdownBegan)
{
boost::asio::post(strand_, [this, conn = std::move(connection)]()mutable{
connections.emplace(conn->GetId(), std::move(conn));
for (auto it = connections.begin(); it != connections.end();) {
if (it->second.expired()) {
connections.erase(it);
return;
}
else
++it;
}
});
}
}
#pragma endregion
#pragma region Remove Connection
void removeConnectionById(int id)
{
if (!shutdownBegan)
{
boost::asio::post(strand_, [this, id]()mutable {connections.erase(id); });
}
}
void removeConnection(ConnPtr<MsgId, Executor> const& connection)
{
removeConnectionById(connection->GetId());
}
#pragma endregion
Executor executor_;
strand_t<Executor> strand_=boost::asio::make_strand(executor_);
acceptor_t<Executor> acceptor_{ strand_ };
std::map<int, WeakConnPtr<MsgId, Executor>> connections;
std::atomic_bool shutdownBegan;
std::atomic_bool shutdownCompleted;
int connectionIds{ 10000 };
};
#pragma once
#include "prerequisites.h"
template<typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue<T>&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue<T>&) = delete;
virtual ~ThreadSafeQueue() { clear(); }
// Returns and maintains item at front of Queue
const T& front()
{
std::scoped_lock lock(muxQueue);
return deqQueue.front();
}
// Returns and maintains item at back of Queue
const T& back()
{
std::scoped_lock lock(muxQueue);
return deqQueue.back();
}
// Removes and returns item from front of Queue
T pop_front()
{
std::scoped_lock lock(muxQueue);
auto t = std::move(deqQueue.front());
deqQueue.pop_front();
return t;
}
// Removes and returns item from back of Queue
T pop_back()
{
std::scoped_lock lock(muxQueue);
auto t = std::move(deqQueue.back());
deqQueue.pop_back();
return t;
}
// Adds an item to back of Queue
void push_back(const T& item)
{ std::scoped_lock lock(muxQueue);
deqQueue.emplace_back(std::move(item));
}
// Adds an item to front of Queue
void push_front(const T& item)
{
std::scoped_lock lock(muxQueue);
deqQueue.emplace_front(std::move(item));
}
// Returns true if Queue has no items
bool empty()
{
std::scoped_lock lock(muxQueue);
return deqQueue.empty();
}
// Returns number of items in Queue
size_t count()
{
std::scoped_lock lock(muxQueue);
return deqQueue.size();
}
// Clears Queue
void clear()
{
std::scoped_lock lock(muxQueue);
deqQueue.clear();
}
protected:
std::mutex muxQueue;
std::deque<T> deqQueue;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment