Skip to content

Instantly share code, notes, and snippets.

@jammerxd
Created September 14, 2021 22:50
Show Gist options
  • Save jammerxd/a2c54169d35915f2a102533850123920 to your computer and use it in GitHub Desktop.
Save jammerxd/a2c54169d35915f2a102533850123920 to your computer and use it in GitHub Desktop.
#pragma once
#include "prerequisites.h"
template <typename T>
class Connection : public boost::enable_shared_from_this<Connection<T>>
{
public:
enum class owner
{
server,
client
};
static boost::shared_ptr<Connection<T>> create(
boost::asio::io_context& io_context, int id,
ClientMessageCallbackType<T> message_handle, ClientMessageSentCallbackType<T> messagesent_handle, ClientDisconnectCallbackType<T> disconnect_handle,
ThreadSafeQueue<OwnedMessage<T>>& qIn) {
return boost::shared_ptr<Connection<T>>(new Connection<T>(io_context,id,message_handle,messagesent_handle,disconnect_handle,qIn));
}
//void ModelItem::SubscribeItemCreated(const ModelItemSlot& slot)
//{
// _itemCreated.connect(slot);
//}
/*static void RegisterBroadcastSignal(boost::signals2::signal<void(Message<T>&)>& signal, Connection<T>* connection)
{
//bind(&ModelItem::OnItemCreated, listener, _1)
RegisterSignals(signal,boost::bind(&Connection<T>::Send, connection, boost::placeholders::_1));
}
void UnRegisterBroadcastSignal(Connection<T>* connection)
{
//signal.disconnect(boost::bind(&Connection::Send, connection, boost::placeholders::_1));
}
static void RegisterSignals(boost::signals2::signal<void(Message<T>&)>& signal,boost::signals2::signal<void(Message<T>&)>::slot_type& slot)
{
signal.connect(slot);
}*/
boost::asio::ip::tcp::socket& socket() {
return this->socket_;
}
void accepted(boost::signals2::connection bcast_connection) {
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Connected.";
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
this->c = bcast_connection;
boost::asio::ip::tcp::no_delay option(true);
this->socket_.set_option(option);
this->ReadHeader();
}
int GetId() {
return this->connectionId;
}
void Disconnect(bool cancel,bool shutdown, bool close) {
this->qMessagesOut.clear();
if (this->socket_.is_open() && !this->alreadyDisconnected)
{
this->alreadyDisconnected = true;
boost::system::error_code ec;
try
{
if (cancel)
this->socket_.cancel();
}
catch (const std::exception& ex)
{
std::cout << "[ Client " << this->GetId() << " ] Cancel Exception: " << ex.what() << std::endl;
}
try
{
if (shutdown)
this->socket_.shutdown(boost::asio::socket_base::shutdown_both, ec);
if (ec)
{
std::cout << " [ Client " << this->GetId() << " ] Shutdown Error: " << ec.value() << " - " << ec.message();
}
}
catch (const std::exception& ex)
{
std::cout << "[ Client " << this->GetId() << " ] Shutdown Exception: " << ex.what() << std::endl;
}
try
{
if (close && !ec)
this->socket_.close();
}
catch (const std::exception& ex)
{
std::cout << "[ Client " << this->GetId() << " ] Close Exception: " << ex.what() << std::endl;
}
//this->broadcastSignal.disconnect(boost::bind(&Connection::Send, this);
//UnRegisterBroadcastSignal(this->broadcastSignal, this);
this->c.disconnect();
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Disconnected.";
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
if (this->disconnect_handler != nullptr)
{
this->disconnect_handler(this->shared_from_this());
}
}
this->invalidState = true;
}
bool IsInvalid() const {
return !this->socket_.is_open() || this->invalidState;
}
void Send(const Message<T>& msg)
{
bool bEmpty = qMessagesOut.empty();
qMessagesOut.push_back(msg);
if (bEmpty)
{
WriteHeader();
}
}
size_t GetSendBacklog()
{
if (!IsInvalid())
{
return qMessagesOut.count();
}
return 0;
}
~Connection() {
Disconnect(true,true,true);
}
private:
Connection(boost::asio::io_context& io_context, int id, ClientMessageCallbackType<T> message_handle, ClientMessageSentCallbackType<T> messagesent_handle, ClientDisconnectCallbackType<T> disconnect_handle, ThreadSafeQueue<OwnedMessage<T>>& qIn) : socket_(io_context), strand_(io_context), qMessagesIn(qIn){
this->connectionId = id;
this->tempInMsg = Message<T>();
this->invalidState = false;
this->disconnect_handler = disconnect_handle;
this->message_handler = message_handle;
this->alreadyDisconnected = false;
this->messagesent_handler = messagesent_handle;
}
ClientDisconnectCallbackType<T> disconnect_handler;
ClientMessageCallbackType<T> message_handler;
ClientMessageSentCallbackType<T> messagesent_handler;
void ReadHeader() {
auto self = this->shared_from_this();
boost::asio::async_read(socket_, boost::asio::buffer(&this->tempInMsg.message_header, sizeof(MessageHeader<T>)),
strand_.wrap([this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
// A complete message header has been read, check if this message
// has a body to follow...
if (this->tempInMsg.message_header.size > 0)
{
// ...it does, so allocate enough space in the messages' body
// vector, and issue asio with the task to read the body.
this->tempInMsg.body.resize(this->tempInMsg.message_header.size);
this->ReadBody();
}
else
{
// it doesn't, so add this bodyless message to the connections
// incoming message queue
AddToIncomingMessageQueue();
//Go back to waiting for header
this->ReadHeader();
}
}
else
{
// Reading form the client went wrong, most likely a disconnect
// has occurred. Close the socket and let the system tidy it up later.
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Read Header Failed." << " " << ec.value() << " - " << ec.message();
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
this->invalidState = true;
this->Disconnect(false,true,true);
}
}));
}
void ReadBody() {
auto self = this->shared_from_this();
boost::asio::async_read(socket_, boost::asio::buffer(this->tempInMsg.body.data(), this->tempInMsg.body.size()),
strand_.wrap([this, self](std::error_code ec, std::size_t length)
{
if (!ec)
{
// ...and they have! The message is now complete, so add
// the whole message to incoming queue
AddToIncomingMessageQueue();
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] SENT: " << this->tempInMsg.body.data();
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
//Go Back to waiting for header
this->ReadHeader();
}
else
{
// As above!
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Read Body Failed." << " " << ec.value() << " - " << ec.message();
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
this->invalidState = true;
this->Disconnect(false,true,true);
}
}));
}
void WriteHeader()
{
auto self = this->shared_from_this();
// If this function is called, we know the outgoing message queue must have
// at least one message to send. So allocate a transmission buffer to hold
// the message, and issue the work - asio, send these bytes
if(!qMessagesOut.empty())
{
Message<T> msg = qMessagesOut.pop_front();
Message<T>* message = new Message<T>();
message->body = msg.body;
message->message_header = msg.message_header;
message->TransactionId = msg.TransactionId;
boost::asio::async_write(socket_, boost::asio::buffer(&message->message_header, sizeof(MessageHeader<T>)),
strand_.wrap([this, self, message](std::error_code ec, std::size_t length)
{
// asio has now sent the bytes - if there was a problem
// an error would be available...
if (!ec)
{
// ... no error, so check if the message header just sent also
// has a message body...
if (message->body.size() > 0)
{
//Message<T>* message = new Message<T>();
//message->body = qMessagesOut.front().body;
//message->message_header = qMessagesOut.front().message_header;
//message->TransactionId = msg.TransactionId;
// ...it does, so issue the task to write the body bytes
WriteBody(message);
//qMessagesOut.pop_front();
}
else
{
if (this->messagesent_handler != nullptr)
{
OwnedMessage<T>* tempOutMsg = new OwnedMessage<T>();
//tempOutMsg->msg = qMessagesOut.front();
tempOutMsg->msg = *message;
tempOutMsg->remote = this->shared_from_this();
this->messagesent_handler(tempOutMsg);
delete tempOutMsg;
delete message;
}
// ...it didnt, so we are done with this message. Remove it from
// the outgoing message queue
qMessagesOut.pop_front();
// If the queue is not empty, there are more messages to send, so
// make this happen by issuing the task to send the next header.
if (!qMessagesOut.empty())
{
WriteHeader();
}
}
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Wrote " << length << " bytes. (header)";
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
}
else
{
// ...asio failed to write the message, we could analyse why but
// for now simply assume the connection has died by closing the
// socket. When a future attempt to write to this client fails due
// to the closed socket, it will be tidied up.
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Write Header Fail." << " " << ec.value() << " - " << ec.message();
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
this->invalidState = true;
this->Disconnect(true, true, true);
}
}));
}
}
// ASYNC - Prime context to write a message body
void WriteBody(Message<T>* message)
{
auto self = this->shared_from_this();
// If this function is called, a header has just been sent, and that header
// indicated a body existed for this message. Fill a transmission buffer
// with the body data, and send it!
boost::asio::async_write(socket_, boost::asio::buffer(message->body.data(), message->body.size()),
strand_.wrap([this, self,message](std::error_code ec, std::size_t length)
{
if (!ec)
{
if (this->messagesent_handler != nullptr)
{
OwnedMessage<T>* tempOutMsg = new OwnedMessage<T>();
tempOutMsg->msg = *message;
tempOutMsg->remote = this->shared_from_this();
this->messagesent_handler(tempOutMsg);
delete tempOutMsg;
delete message;
}
// Sending was successful, so we are done with the message
// and remove it from the queue
//qMessagesOut.pop_front();
// If the queue still has messages in it, then issue the task to
// send the next messages' header.
if (!qMessagesOut.empty())
{
WriteHeader();
}
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Wrote " << length << " bytes. (body)";
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
}
else
{
// Sending failed, see WriteHeader() equivalent for description :P
#ifdef VERBOSE_SERVER_DEBUG
std::stringstream ss;
ss << "[ Client " << this->GetId() << " ] " << "Write Body Fail." << " " << ec.value() << " - " << ec.message();
std::string output = ss.str();
std::cout << output << std::endl;
output = "";
ss.str("");
#endif
this->invalidState = true;
this->Disconnect(true,true,true);
}
}));
}
void AddToIncomingMessageQueue()
{
OwnedMessage<T> msg{ this->shared_from_this(), tempInMsg };
//qMessagesIn.push_back(msg);
if (this->message_handler != nullptr)
{
this->message_handler(msg);
}
msg.msg.body.clear();
msg.msg.message_header.size = 0;
tempInMsg.body.clear();
tempInMsg.message_header.size = 0;
}
boost::asio::ip::tcp::socket socket_;
boost::asio::io_context::strand strand_;
int connectionId;
Message<T> tempInMsg;
Message<T> tempOutMsg;
ThreadSafeQueue<OwnedMessage<T>>& qMessagesIn;
ThreadSafeQueue<Message<T>> qMessagesOut;
boost::signals2::connection c;
bool invalidState;
bool alreadyDisconnected;
};
#pragma once
#include "prerequisites.h"
template <typename T>
struct Message
{
size_t size() const {
return this->body.size();
}
MessageHeader<T> message_header{};
std::vector<unsigned char> body;
std::string TransactionId;
void Append(const char* data)
{
size_t i = body.size();
size_t data_length = strlen(data);
body.resize(body.size() + sizeof(size_t) + data_length);
std::memcpy(body.data() + i, &data_length, sizeof(size_t));
std::memcpy(body.data() + sizeof(size_t) + i, data, data_length);
message_header.size = (uint32_t)size();
}
void Append(std::string& data)
{
const char* data_cStr = data.c_str();
Append(data_cStr);
}
void GetString(size_t offset, std::string& dst)
{
if (body.size() >= offset + sizeof(size_t))
{
size_t length;
std::memcpy(&length, body.data() + offset, sizeof(size_t));
if (body.size() >= offset + sizeof(size_t) + length)
{
dst.assign(length, body[sizeof(size_t) + offset]);
}
}
std::string error = "ERROR";
dst.assign(error);
}
std::string GetString(size_t offset)
{
if (body.size() >= offset + sizeof(size_t))
{
size_t length;
std::memcpy(&length, body.data() + offset, sizeof(size_t));
if (body.size() >= offset + sizeof(size_t) + length)
{
std::string s = std::string(body.begin() + offset + sizeof(size_t), body.begin() + offset + sizeof(size_t) + length);//WORKS
return s;
}
}
std::string error = "ERROR";
return error;
}
// Override for std::cout compatibility - produces friendly description of message
friend std::ostream& operator << (std::ostream& os, const Message<T>& msg)
{
os << "ID:" << int(msg.message_header.id) << " Size:" << msg.message_header.size;
return os;
}
// Convenience Operator overloads - These allow us to add and remove stuff from
// the body vector as if it were a stack, so First in, Last Out. These are a
// template in itself, because we dont know what data type the user is pushing or
// popping, so lets allow them all. NOTE: It assumes the data type is fundamentally
// Plain Old Data (POD). TLDR: Serialise & Deserialise into/from a vector
// Pushes any POD-like data into the message buffer
template<typename DataType>
friend Message<T>& operator << (Message<T>& msg, const DataType& data)
{
// Check that the type of the data being pushed is trivially copyable
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pushed into vector");
// Cache current size of vector, as this will be the point we insert the data
size_t i = msg.body.size();
// Resize the vector by the size of the data being pushed
msg.body.resize(msg.body.size() + sizeof(DataType));
// Physically copy the data into the newly allocated vector space
std::memcpy(msg.body.data() + i, &data, sizeof(DataType));
// Recalculate the message size
msg.message_header.size = (uint32_t)msg.size();
// Return the target message so it can be "chained"
return msg;
}
// Pulls any POD-like data form the message buffer
template<typename DataType>
friend Message<T>& operator >> (Message<T>& msg, DataType& data)
{
// Check that the type of the data being pushed is trivially copyable
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pulled from vector");
// Cache the location towards the end of the vector where the pulled data starts
size_t i = msg.body.size() - sizeof(DataType);
// Physically copy the data from the vector into the user variable
std::memcpy(&data, msg.body.data() + i, sizeof(DataType));
// Shrink the vector to remove read bytes, and reset end position
msg.body.resize(i);
// Recalculate the message size
msg.message_header.size = msg.size();
// Return the target message so it can be "chained"
return msg;
}
};
#pragma once
#include "prerequisites.h"
template<typename T>
struct MessageHeader
{
T id{};
uint32_t size = 0;
};
#pragma once
#include "prerequisites.h"
template <typename T>
class Connection;
template <typename T>
struct OwnedMessage
{
boost::shared_ptr<Connection<T>> remote = nullptr;
Message<T> msg;
// Again, a friendly string maker
friend std::ostream& operator<<(std::ostream& os, const OwnedMessage<T>& msg)
{
os << msg.msg;
return os;
}
};
#pragma once
// Windows stuff.
#define _CRT_SECURE_NO_WARNINGS
#define NOMINMAX
//boost asio
#include <boost/asio.hpp>
#include <ctime>
#include <cstring>
#include <chrono>
#include <time.h>
#include <sstream>
#include <string>
#include <vector>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <stdarg.h>
#include <iostream>
#include <thread>
#include <future>
#include <algorithm>
#include <cstdlib>
#include <list>
#include <set>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <execution>
#ifdef _WIN32
#include <ShlObj.h>
#include <Shlwapi.h>
#endif
#include <iostream>
#include <ostream>
#include <sstream>
#include <errno.h>
#include <stdint.h>
#include <fstream>
#include <stdexcept>
#include <math.h>
#include <random>
#include <climits>
#include <memory>
#include <utility>
#include <queue>
#include <optional>
#include <boost/bind/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include "boost/date_time/posix_time/posix_time_types.hpp"
#include <boost/thread/mutex.hpp>
#include <boost/functional.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include <boost/signals2.hpp>
//Function pointer called CallbackType that takes a float
//and returns an int
//typedef int (*CallbackType)(float);
template<typename T>
class Connection;
template<typename T>
struct Message;
template<typename T>
struct OwnedMessage;
//template<typename T>
//using ClientDisconnectCallbackType = void (*)(boost::shared_ptr<Connection<T>>);
template<typename T>
using ClientDisconnectCallbackType = boost::function<void(boost::shared_ptr<Connection<T>>)>;
template<typename T>
using ClientMessageCallbackType = boost::function<void(OwnedMessage<T>&)>;
template<typename T>
using ClientMessageSentCallbackType = boost::function<void(OwnedMessage<T>*)>;
#include "MessageHeader.h"
#include "Message.h"
#include "OwnedMessage.h"
#include "ThreadSafeQueue.h"
#include "Connection.h"
#include "Server.h"
// Server-Test.cpp : This file contains the 'main' function. Program execution begins and ends there.
//
#include "prerequisites.h"
boost::asio::io_context context;
int messageCount;
boost::thread_group tg;
enum class MessageTypes : uint32_t
{
ServerAccept,
ServerDeny,
ServerPing,
MessageAll,
SendText,
ServerMessage,
ServerMessage1,
ServerMessage2,
ServerMessage3,
ServerMessage4,
ServerMessage5,
ServerMessage6,
ServerMessage7,
ServerMessage8,
ServerMessage9,
};
class MyServer : public Server<MessageTypes>
{
public:
MyServer(boost::asio::io_context& context, boost::asio::ip::tcp::endpoint ep) : Server<MessageTypes>(context,ep)
{
}
virtual void OnClientDisconnect(boost::shared_ptr<Connection<MessageTypes>> client)
{
std::cout << "[ Client " << client->GetId() << " ] Disconnected" << std::endl;
}
virtual bool OnClientConnect(boost::shared_ptr<Connection<MessageTypes>> client)
{
std::cout << "[ Client " << client->GetId() << " ] Connected" << std::endl;
Message<MessageTypes> msg;
msg.message_header.id = MessageTypes::ServerAccept;
msg << client->GetId();
client->Send(msg);
return true;
}
virtual void OnMessage(OwnedMessage<MessageTypes>& msg)
{
std::cout << "[ Client " << msg.remote->GetId() << " ] ";
if (msg.msg.message_header.id == MessageTypes::SendText)
{
std::string message= msg.msg.GetString(0);
std::cout << " Received Message" << std::endl;
message = "";
msg.remote->Send(msg.msg);//fire it back to the client
}
}
virtual void OnMessageSent(OwnedMessage<MessageTypes>* msg)
{
//std::cout << "[ Client " << msg->remote->GetId() << " ] ";
//std::cout << " Sent Message" << std::endl;
}
};
MyServer* srv;
void RunAsioContext()
{
std::cout << "IO Thread Starting - Id: " << boost::this_thread::get_id() << std::endl;
context.run();
std::cout << "IO Thread finished - Id: " << boost::this_thread::get_id() << std::endl;
}
long long previous_time = 0;
long long highest_time = 0;
int total_thread_count = 0;
size_t max_thread_count = 1;
boost::asio::io_service io_service;
boost::posix_time::seconds interval(1);
boost::posix_time::seconds interval2(5);
boost::asio::deadline_timer timer(io_service, interval);
volatile bool stop = false;
void timedBcast(const boost::system::error_code& e)
{
//std::cout << "Beginning BCAST..." << std::endl;
std::chrono::high_resolution_clock::time_point t2 =
std::chrono::high_resolution_clock::now();
//while (!context.stopped() && !io_service.stopped() && !e && !stop)
if (!context.stopped() && !io_service.stopped() && !e && !stop)
{
if (srv != nullptr)
{
if (messageCount >= 1000)
{
messageCount = 0;
}
//std::cout << "SENDING BCAST" << std::endl;
//std::string message = "HELLO WORLD TO ALL BROADCAST! ";
//message += std::to_string(messageCount++);
int msg_sz = rand() % 102400 + 81920;
std::string message = std::string(msg_sz, 'a');
message += " ";
message += std::to_string(messageCount++);
Message<MessageTypes> msg;
msg.message_header.id = MessageTypes::MessageAll;
msg.Append(message);
msg.TransactionId = "Broadcast";
std::chrono::high_resolution_clock::time_point t1 =
std::chrono::high_resolution_clock::now();
srv->BroadcastMessage(msg);
long long time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - t1).count();
long long time2 = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - t2).count();
if (time != previous_time && time > 6)
{
//timer += time - 6;
std::cout << "Broadcast took " << time << "ms | " << time2 << "ms" << std::endl;
previous_time = time;
}
int time_expire = 100 - time2 - 1;
if (time_expire <= 5)
{
time_expire = 50;
}
// Reschedule the timer for 1 second in the future:
timer.expires_from_now(boost::posix_time::milliseconds(time_expire));
// Posts the timer event
timer.async_wait(timedBcast);
std::cout << "BROADCAST" << std::endl;
}
}
//std::cout << "Exited bcast" << std::endl;
//timer.cancel();
}
void monitorBuffer()
{
int delay_ms = 1000;
while (!context.stopped())
{
boost::this_thread::sleep_for(boost::chrono::milliseconds(delay_ms));
if (srv != nullptr)
{
auto avg = srv->CalculateAverageBacklog();
int num_threads = avg / 5 + avg % 5;
for (int i = 0; i < num_threads; i++)
{
tg.create_thread(&RunAsioContext);
}
if (num_threads > 0)
{
std::cout << "ADDING MORE THREADS " << num_threads;
}
}
}
}
int main()
{
messageCount = 1;
context.reset();
io_service.reset();
srand(time(NULL));
boost::asio::ip::tcp::endpoint ep = boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::loopback(), 40000);
srv = new MyServer(context, ep);
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i)
tg.create_thread(&RunAsioContext);
std::cout << "Hello World!\n";
timer.expires_from_now(boost::posix_time::milliseconds(5000));
timer.async_wait(timedBcast);
boost::thread t1([] {io_service.run(); });
t1.detach();
//boost::thread t2(&monitorBuffer);
//t2.detach();
std::string str;
std::getline(std::cin, str);
srv->interrupt();
//t1.interrupt();
stop = true;
timer.cancel();
io_service.stop();
timer.cancel();
context.stop();
tg.join_all();
if (t1.joinable())
{
t1.join();
}
//if (t2.joinable())
//{
// t2.join();
//}
str="";
std::getline(std::cin, str);
delete srv;
return 0;
}
// Run program: Ctrl + F5 or Debug > Start Without Debugging menu
// Debug program: F5 or Debug > Start Debugging menu
// Tips for Getting Started:
// 1. Use the Solution Explorer window to add/manage files
// 2. Use the Team Explorer window to connect to source control
// 3. Use the Output window to see build output and other messages
// 4. Use the Error List window to view errors
// 5. Go to Project > Add New Item to create new code files, or Project > Add Existing Item to add existing code files to the project
// 6. In the future, to open this project again, go to File > Open > Project and select the .sln file
#pragma once
#include "prerequisites.h"
template <typename T>
class Server
{
public:
Server(boost::asio::io_context& io_context, boost::asio::ip::tcp::endpoint endpoint) :io_context_(io_context), acceptor_(io_context)
{
this->connectionIds = 10000;
this->acceptor_.open(endpoint.protocol());
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::do_not_route(true));
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::keep_alive(false));
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::enable_connection_aborted(false));
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::linger(false, 3));
this->acceptor_.bind(endpoint);
this->acceptor_.listen();
this->shutdownBeginning = false;
this->shutdownCompleted = false;
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
this->connections = std::map<int,boost::shared_ptr<Connection<T>>>();
this->connectionsToRemove = std::vector<int>();
conMutex.unlock();
//this->sendingThread = std::thread(&Server::processMsgs, this);
this->start_accept();
}
void interrupt() {
this->shutdownBeginning = true;
//this->messageVar.notify_one();
this->acceptor_.cancel();
this->acceptor_.close();
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
for (const auto& [key, value] : this->connections) {
value->Disconnect(true,true,true);
}
this->connections.clear();
conMutex.unlock();
this->shutdownCompleted = true;
}
unsigned long long CalculateAverageBacklog()
{
size_t total = 0;
size_t count = 0;
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
for (const auto& [key, value] : this->connections)
{
if (!value->IsInvalid())
{
size_t backlog = value->GetSendBacklog();
total += backlog;
count++;
}
}
conMutex.unlock();
if (count > 0)
{
auto average = total / count;
return average;
}
return 0;
}
void BroadcastMessage(Message<T>& msg)
{
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
/*for (const auto& [key, value] : this->connections)
{
if (!value->IsInvalid())
{
value->Send(msg);
}
}*/
std::for_each(
std::execution::par_unseq,
this->connections.begin(),
this->connections.end(),
[msg](auto&& item)
{
item.second->Send(msg);
});
conMutex.unlock();
//this->bcast(msg);
}
~Server() {}
bool IsShutDownCompleted() {
return this->shutdownCompleted;
}
protected:
// This server class should override thse functions to implement
// customised functionality
// Called when a client connects, you can veto the connection by returning false
virtual bool OnClientConnect(boost::shared_ptr<Connection<T>> client)
{
return false;
}
// Called when a client appears to have disconnected
virtual void OnClientDisconnect(boost::shared_ptr<Connection<T>> client)
{
}
// Called when a message arrives
virtual void OnMessage(OwnedMessage<T>& message)
{
}
virtual void OnMessageSent(OwnedMessage<T>* message)
{
}
private:
void client_disconnected(boost::shared_ptr<Connection<T>> connection)
{
OnClientDisconnect(connection);
removeConnection(connection);
}
void client_message(OwnedMessage<T>& message)
{
OnMessage(message);
}
void message_sent(OwnedMessage<T>* message)
{
OnMessageSent(message);
}
void start_accept() {
if (!this->shutdownBeginning && this->acceptor_.is_open())
{
int id = this->connectionIds++;
boost::shared_ptr<Connection<T>> new_connection =
Connection<T>::create(this->io_context_, id, boost::bind(&Server::client_message, this, boost::placeholders::_1), boost::bind(&Server::message_sent, this, boost::placeholders::_1), boost::bind(&Server::client_disconnected, this, boost::placeholders::_1), qMessagesIn);
this->acceptor_.async_accept(new_connection->socket(),
boost::bind(&Server::handle_accept, this, new_connection,
boost::asio::placeholders::error));
}
}
#pragma region Handle the Connection Acceptance
void handle_accept(boost::shared_ptr<Connection<T>> new_connection,
const boost::system::error_code& error) {
if (!error && !this->shutdownBeginning)
{
this->start_accept();
if (OnClientConnect(new_connection))
{
boost::signals2::connection c = this->bcast.connect(boost::bind(&Connection<T>::Send, new_connection, boost::placeholders::_1));
new_connection->accepted(c);
this->addConnection(std::move(new_connection));
}
else
{
#ifdef VERBOSE_SERVER_DEBUG
std::cout << "[ Client " << new_connection->GetId() << " ] Connection Denied." << std::endl;
#endif
}
}
else if (!this->shutdownBeginning)
{
this->start_accept();
#ifdef VERBOSE_SERVER_DEBUG
std::cout << "[ SERVER ] New connection error: " << error.message() << std::endl;
#endif
}
}
#pragma endregion
#pragma region Add Connection
void addConnection(boost::shared_ptr<Connection<T>> connection)
{
if (!this->shutdownBeginning)
{
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
this->connections.insert(std::make_pair(connection->GetId(),std::move(connection)));
conMutex.unlock();
}
}
#pragma endregion
#pragma region Remove Connection
void removeConnectionById(int id)
{
if (!this->shutdownBeginning)
{
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
this->connections.erase(id);
conMutex.unlock();
}
}
void removeConnection(boost::shared_ptr<Connection<T>> connection)
{
if (!this->shutdownBeginning)
{
int id = connection->GetId();
std::unique_lock<std::mutex> conMutex(this->connectionMutex);
this->connections.erase(id);
conMutex.unlock();
}
}
#pragma endregion
boost::asio::io_context& io_context_;
boost::asio::ip::tcp::acceptor acceptor_;
std::mutex connectionMutex;
std::map<int,boost::shared_ptr<Connection<T>>> connections;
std::vector<int> connectionsToRemove;
volatile bool shutdownBeginning;
volatile bool shutdownCompleted;
int connectionIds;
ThreadSafeQueue<OwnedMessage<T>> qMessagesIn;
boost::signals2::signal<void(Message<T>&)> bcast;
};
#pragma once
#include "prerequisites.h"
template<typename T>
class ThreadSafeQueue
{
public:
ThreadSafeQueue() = default;
ThreadSafeQueue(const ThreadSafeQueue<T>&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue<T>&) = delete;
virtual ~ThreadSafeQueue() { clear(); }
// Returns and maintains item at front of Queue
const T& front()
{
std::scoped_lock lock(muxQueue);
return deqQueue.front();
}
// Returns and maintains item at back of Queue
const T& back()
{
std::scoped_lock lock(muxQueue);
return deqQueue.back();
}
// Removes and returns item from front of Queue
T pop_front()
{
std::scoped_lock lock(muxQueue);
auto t = std::move(deqQueue.front());
deqQueue.pop_front();
return t;
}
// Removes and returns item from back of Queue
T pop_back()
{
std::scoped_lock lock(muxQueue);
auto t = std::move(deqQueue.back());
deqQueue.pop_back();
return t;
}
// Adds an item to back of Queue
void push_back(const T& item)
{ std::scoped_lock lock(muxQueue);
deqQueue.emplace_back(std::move(item));
}
// Adds an item to front of Queue
void push_front(const T& item)
{
std::scoped_lock lock(muxQueue);
deqQueue.emplace_front(std::move(item));
}
// Returns true if Queue has no items
bool empty()
{
std::scoped_lock lock(muxQueue);
return deqQueue.empty();
}
// Returns number of items in Queue
size_t count()
{
std::scoped_lock lock(muxQueue);
return deqQueue.size();
}
// Clears Queue
void clear()
{
std::scoped_lock lock(muxQueue);
deqQueue.clear();
}
protected:
std::mutex muxQueue;
std::deque<T> deqQueue;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment