-
-
Save jammerxd/a2c54169d35915f2a102533850123920 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename T> | |
class Connection : public boost::enable_shared_from_this<Connection<T>> | |
{ | |
public: | |
enum class owner | |
{ | |
server, | |
client | |
}; | |
static boost::shared_ptr<Connection<T>> create( | |
boost::asio::io_context& io_context, int id, | |
ClientMessageCallbackType<T> message_handle, ClientMessageSentCallbackType<T> messagesent_handle, ClientDisconnectCallbackType<T> disconnect_handle, | |
ThreadSafeQueue<OwnedMessage<T>>& qIn) { | |
return boost::shared_ptr<Connection<T>>(new Connection<T>(io_context,id,message_handle,messagesent_handle,disconnect_handle,qIn)); | |
} | |
//void ModelItem::SubscribeItemCreated(const ModelItemSlot& slot) | |
//{ | |
// _itemCreated.connect(slot); | |
//} | |
/*static void RegisterBroadcastSignal(boost::signals2::signal<void(Message<T>&)>& signal, Connection<T>* connection) | |
{ | |
//bind(&ModelItem::OnItemCreated, listener, _1) | |
RegisterSignals(signal,boost::bind(&Connection<T>::Send, connection, boost::placeholders::_1)); | |
} | |
void UnRegisterBroadcastSignal(Connection<T>* connection) | |
{ | |
//signal.disconnect(boost::bind(&Connection::Send, connection, boost::placeholders::_1)); | |
} | |
static void RegisterSignals(boost::signals2::signal<void(Message<T>&)>& signal,boost::signals2::signal<void(Message<T>&)>::slot_type& slot) | |
{ | |
signal.connect(slot); | |
}*/ | |
boost::asio::ip::tcp::socket& socket() { | |
return this->socket_; | |
} | |
void accepted(boost::signals2::connection bcast_connection) { | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Connected."; | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
this->c = bcast_connection; | |
boost::asio::ip::tcp::no_delay option(true); | |
this->socket_.set_option(option); | |
this->ReadHeader(); | |
} | |
int GetId() { | |
return this->connectionId; | |
} | |
void Disconnect(bool cancel,bool shutdown, bool close) { | |
this->qMessagesOut.clear(); | |
if (this->socket_.is_open() && !this->alreadyDisconnected) | |
{ | |
this->alreadyDisconnected = true; | |
boost::system::error_code ec; | |
try | |
{ | |
if (cancel) | |
this->socket_.cancel(); | |
} | |
catch (const std::exception& ex) | |
{ | |
std::cout << "[ Client " << this->GetId() << " ] Cancel Exception: " << ex.what() << std::endl; | |
} | |
try | |
{ | |
if (shutdown) | |
this->socket_.shutdown(boost::asio::socket_base::shutdown_both, ec); | |
if (ec) | |
{ | |
std::cout << " [ Client " << this->GetId() << " ] Shutdown Error: " << ec.value() << " - " << ec.message(); | |
} | |
} | |
catch (const std::exception& ex) | |
{ | |
std::cout << "[ Client " << this->GetId() << " ] Shutdown Exception: " << ex.what() << std::endl; | |
} | |
try | |
{ | |
if (close && !ec) | |
this->socket_.close(); | |
} | |
catch (const std::exception& ex) | |
{ | |
std::cout << "[ Client " << this->GetId() << " ] Close Exception: " << ex.what() << std::endl; | |
} | |
//this->broadcastSignal.disconnect(boost::bind(&Connection::Send, this); | |
//UnRegisterBroadcastSignal(this->broadcastSignal, this); | |
this->c.disconnect(); | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Disconnected."; | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
if (this->disconnect_handler != nullptr) | |
{ | |
this->disconnect_handler(this->shared_from_this()); | |
} | |
} | |
this->invalidState = true; | |
} | |
bool IsInvalid() const { | |
return !this->socket_.is_open() || this->invalidState; | |
} | |
void Send(const Message<T>& msg) | |
{ | |
bool bEmpty = qMessagesOut.empty(); | |
qMessagesOut.push_back(msg); | |
if (bEmpty) | |
{ | |
WriteHeader(); | |
} | |
} | |
size_t GetSendBacklog() | |
{ | |
if (!IsInvalid()) | |
{ | |
return qMessagesOut.count(); | |
} | |
return 0; | |
} | |
~Connection() { | |
Disconnect(true,true,true); | |
} | |
private: | |
Connection(boost::asio::io_context& io_context, int id, ClientMessageCallbackType<T> message_handle, ClientMessageSentCallbackType<T> messagesent_handle, ClientDisconnectCallbackType<T> disconnect_handle, ThreadSafeQueue<OwnedMessage<T>>& qIn) : socket_(io_context), strand_(io_context), qMessagesIn(qIn){ | |
this->connectionId = id; | |
this->tempInMsg = Message<T>(); | |
this->invalidState = false; | |
this->disconnect_handler = disconnect_handle; | |
this->message_handler = message_handle; | |
this->alreadyDisconnected = false; | |
this->messagesent_handler = messagesent_handle; | |
} | |
ClientDisconnectCallbackType<T> disconnect_handler; | |
ClientMessageCallbackType<T> message_handler; | |
ClientMessageSentCallbackType<T> messagesent_handler; | |
void ReadHeader() { | |
auto self = this->shared_from_this(); | |
boost::asio::async_read(socket_, boost::asio::buffer(&this->tempInMsg.message_header, sizeof(MessageHeader<T>)), | |
strand_.wrap([this, self](std::error_code ec, std::size_t length) | |
{ | |
if (!ec) | |
{ | |
// A complete message header has been read, check if this message | |
// has a body to follow... | |
if (this->tempInMsg.message_header.size > 0) | |
{ | |
// ...it does, so allocate enough space in the messages' body | |
// vector, and issue asio with the task to read the body. | |
this->tempInMsg.body.resize(this->tempInMsg.message_header.size); | |
this->ReadBody(); | |
} | |
else | |
{ | |
// it doesn't, so add this bodyless message to the connections | |
// incoming message queue | |
AddToIncomingMessageQueue(); | |
//Go back to waiting for header | |
this->ReadHeader(); | |
} | |
} | |
else | |
{ | |
// Reading form the client went wrong, most likely a disconnect | |
// has occurred. Close the socket and let the system tidy it up later. | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Read Header Failed." << " " << ec.value() << " - " << ec.message(); | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
this->invalidState = true; | |
this->Disconnect(false,true,true); | |
} | |
})); | |
} | |
void ReadBody() { | |
auto self = this->shared_from_this(); | |
boost::asio::async_read(socket_, boost::asio::buffer(this->tempInMsg.body.data(), this->tempInMsg.body.size()), | |
strand_.wrap([this, self](std::error_code ec, std::size_t length) | |
{ | |
if (!ec) | |
{ | |
// ...and they have! The message is now complete, so add | |
// the whole message to incoming queue | |
AddToIncomingMessageQueue(); | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] SENT: " << this->tempInMsg.body.data(); | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
//Go Back to waiting for header | |
this->ReadHeader(); | |
} | |
else | |
{ | |
// As above! | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Read Body Failed." << " " << ec.value() << " - " << ec.message(); | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
this->invalidState = true; | |
this->Disconnect(false,true,true); | |
} | |
})); | |
} | |
void WriteHeader() | |
{ | |
auto self = this->shared_from_this(); | |
// If this function is called, we know the outgoing message queue must have | |
// at least one message to send. So allocate a transmission buffer to hold | |
// the message, and issue the work - asio, send these bytes | |
if(!qMessagesOut.empty()) | |
{ | |
Message<T> msg = qMessagesOut.pop_front(); | |
Message<T>* message = new Message<T>(); | |
message->body = msg.body; | |
message->message_header = msg.message_header; | |
message->TransactionId = msg.TransactionId; | |
boost::asio::async_write(socket_, boost::asio::buffer(&message->message_header, sizeof(MessageHeader<T>)), | |
strand_.wrap([this, self, message](std::error_code ec, std::size_t length) | |
{ | |
// asio has now sent the bytes - if there was a problem | |
// an error would be available... | |
if (!ec) | |
{ | |
// ... no error, so check if the message header just sent also | |
// has a message body... | |
if (message->body.size() > 0) | |
{ | |
//Message<T>* message = new Message<T>(); | |
//message->body = qMessagesOut.front().body; | |
//message->message_header = qMessagesOut.front().message_header; | |
//message->TransactionId = msg.TransactionId; | |
// ...it does, so issue the task to write the body bytes | |
WriteBody(message); | |
//qMessagesOut.pop_front(); | |
} | |
else | |
{ | |
if (this->messagesent_handler != nullptr) | |
{ | |
OwnedMessage<T>* tempOutMsg = new OwnedMessage<T>(); | |
//tempOutMsg->msg = qMessagesOut.front(); | |
tempOutMsg->msg = *message; | |
tempOutMsg->remote = this->shared_from_this(); | |
this->messagesent_handler(tempOutMsg); | |
delete tempOutMsg; | |
delete message; | |
} | |
// ...it didnt, so we are done with this message. Remove it from | |
// the outgoing message queue | |
qMessagesOut.pop_front(); | |
// If the queue is not empty, there are more messages to send, so | |
// make this happen by issuing the task to send the next header. | |
if (!qMessagesOut.empty()) | |
{ | |
WriteHeader(); | |
} | |
} | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Wrote " << length << " bytes. (header)"; | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
} | |
else | |
{ | |
// ...asio failed to write the message, we could analyse why but | |
// for now simply assume the connection has died by closing the | |
// socket. When a future attempt to write to this client fails due | |
// to the closed socket, it will be tidied up. | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Write Header Fail." << " " << ec.value() << " - " << ec.message(); | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
this->invalidState = true; | |
this->Disconnect(true, true, true); | |
} | |
})); | |
} | |
} | |
// ASYNC - Prime context to write a message body | |
void WriteBody(Message<T>* message) | |
{ | |
auto self = this->shared_from_this(); | |
// If this function is called, a header has just been sent, and that header | |
// indicated a body existed for this message. Fill a transmission buffer | |
// with the body data, and send it! | |
boost::asio::async_write(socket_, boost::asio::buffer(message->body.data(), message->body.size()), | |
strand_.wrap([this, self,message](std::error_code ec, std::size_t length) | |
{ | |
if (!ec) | |
{ | |
if (this->messagesent_handler != nullptr) | |
{ | |
OwnedMessage<T>* tempOutMsg = new OwnedMessage<T>(); | |
tempOutMsg->msg = *message; | |
tempOutMsg->remote = this->shared_from_this(); | |
this->messagesent_handler(tempOutMsg); | |
delete tempOutMsg; | |
delete message; | |
} | |
// Sending was successful, so we are done with the message | |
// and remove it from the queue | |
//qMessagesOut.pop_front(); | |
// If the queue still has messages in it, then issue the task to | |
// send the next messages' header. | |
if (!qMessagesOut.empty()) | |
{ | |
WriteHeader(); | |
} | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Wrote " << length << " bytes. (body)"; | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
} | |
else | |
{ | |
// Sending failed, see WriteHeader() equivalent for description :P | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Write Body Fail." << " " << ec.value() << " - " << ec.message(); | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
this->invalidState = true; | |
this->Disconnect(true,true,true); | |
} | |
})); | |
} | |
void AddToIncomingMessageQueue() | |
{ | |
OwnedMessage<T> msg{ this->shared_from_this(), tempInMsg }; | |
//qMessagesIn.push_back(msg); | |
if (this->message_handler != nullptr) | |
{ | |
this->message_handler(msg); | |
} | |
msg.msg.body.clear(); | |
msg.msg.message_header.size = 0; | |
tempInMsg.body.clear(); | |
tempInMsg.message_header.size = 0; | |
} | |
boost::asio::ip::tcp::socket socket_; | |
boost::asio::io_context::strand strand_; | |
int connectionId; | |
Message<T> tempInMsg; | |
Message<T> tempOutMsg; | |
ThreadSafeQueue<OwnedMessage<T>>& qMessagesIn; | |
ThreadSafeQueue<Message<T>> qMessagesOut; | |
boost::signals2::connection c; | |
bool invalidState; | |
bool alreadyDisconnected; | |
}; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename T> | |
struct Message | |
{ | |
size_t size() const { | |
return this->body.size(); | |
} | |
MessageHeader<T> message_header{}; | |
std::vector<unsigned char> body; | |
std::string TransactionId; | |
void Append(const char* data) | |
{ | |
size_t i = body.size(); | |
size_t data_length = strlen(data); | |
body.resize(body.size() + sizeof(size_t) + data_length); | |
std::memcpy(body.data() + i, &data_length, sizeof(size_t)); | |
std::memcpy(body.data() + sizeof(size_t) + i, data, data_length); | |
message_header.size = (uint32_t)size(); | |
} | |
void Append(std::string& data) | |
{ | |
const char* data_cStr = data.c_str(); | |
Append(data_cStr); | |
} | |
void GetString(size_t offset, std::string& dst) | |
{ | |
if (body.size() >= offset + sizeof(size_t)) | |
{ | |
size_t length; | |
std::memcpy(&length, body.data() + offset, sizeof(size_t)); | |
if (body.size() >= offset + sizeof(size_t) + length) | |
{ | |
dst.assign(length, body[sizeof(size_t) + offset]); | |
} | |
} | |
std::string error = "ERROR"; | |
dst.assign(error); | |
} | |
std::string GetString(size_t offset) | |
{ | |
if (body.size() >= offset + sizeof(size_t)) | |
{ | |
size_t length; | |
std::memcpy(&length, body.data() + offset, sizeof(size_t)); | |
if (body.size() >= offset + sizeof(size_t) + length) | |
{ | |
std::string s = std::string(body.begin() + offset + sizeof(size_t), body.begin() + offset + sizeof(size_t) + length);//WORKS | |
return s; | |
} | |
} | |
std::string error = "ERROR"; | |
return error; | |
} | |
// Override for std::cout compatibility - produces friendly description of message | |
friend std::ostream& operator << (std::ostream& os, const Message<T>& msg) | |
{ | |
os << "ID:" << int(msg.message_header.id) << " Size:" << msg.message_header.size; | |
return os; | |
} | |
// Convenience Operator overloads - These allow us to add and remove stuff from | |
// the body vector as if it were a stack, so First in, Last Out. These are a | |
// template in itself, because we dont know what data type the user is pushing or | |
// popping, so lets allow them all. NOTE: It assumes the data type is fundamentally | |
// Plain Old Data (POD). TLDR: Serialise & Deserialise into/from a vector | |
// Pushes any POD-like data into the message buffer | |
template<typename DataType> | |
friend Message<T>& operator << (Message<T>& msg, const DataType& data) | |
{ | |
// Check that the type of the data being pushed is trivially copyable | |
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pushed into vector"); | |
// Cache current size of vector, as this will be the point we insert the data | |
size_t i = msg.body.size(); | |
// Resize the vector by the size of the data being pushed | |
msg.body.resize(msg.body.size() + sizeof(DataType)); | |
// Physically copy the data into the newly allocated vector space | |
std::memcpy(msg.body.data() + i, &data, sizeof(DataType)); | |
// Recalculate the message size | |
msg.message_header.size = (uint32_t)msg.size(); | |
// Return the target message so it can be "chained" | |
return msg; | |
} | |
// Pulls any POD-like data form the message buffer | |
template<typename DataType> | |
friend Message<T>& operator >> (Message<T>& msg, DataType& data) | |
{ | |
// Check that the type of the data being pushed is trivially copyable | |
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pulled from vector"); | |
// Cache the location towards the end of the vector where the pulled data starts | |
size_t i = msg.body.size() - sizeof(DataType); | |
// Physically copy the data from the vector into the user variable | |
std::memcpy(&data, msg.body.data() + i, sizeof(DataType)); | |
// Shrink the vector to remove read bytes, and reset end position | |
msg.body.resize(i); | |
// Recalculate the message size | |
msg.message_header.size = msg.size(); | |
// Return the target message so it can be "chained" | |
return msg; | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template<typename T> | |
struct MessageHeader | |
{ | |
T id{}; | |
uint32_t size = 0; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename T> | |
class Connection; | |
template <typename T> | |
struct OwnedMessage | |
{ | |
boost::shared_ptr<Connection<T>> remote = nullptr; | |
Message<T> msg; | |
// Again, a friendly string maker | |
friend std::ostream& operator<<(std::ostream& os, const OwnedMessage<T>& msg) | |
{ | |
os << msg.msg; | |
return os; | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
// Windows stuff. | |
#define _CRT_SECURE_NO_WARNINGS | |
#define NOMINMAX | |
//boost asio | |
#include <boost/asio.hpp> | |
#include <ctime> | |
#include <cstring> | |
#include <chrono> | |
#include <time.h> | |
#include <sstream> | |
#include <string> | |
#include <vector> | |
#include <stdio.h> | |
#include <string.h> | |
#include <assert.h> | |
#include <stdarg.h> | |
#include <iostream> | |
#include <thread> | |
#include <future> | |
#include <algorithm> | |
#include <cstdlib> | |
#include <list> | |
#include <set> | |
#include <mutex> | |
#include <condition_variable> | |
#include <functional> | |
#include <execution> | |
#ifdef _WIN32 | |
#include <ShlObj.h> | |
#include <Shlwapi.h> | |
#endif | |
#include <iostream> | |
#include <ostream> | |
#include <sstream> | |
#include <errno.h> | |
#include <stdint.h> | |
#include <fstream> | |
#include <stdexcept> | |
#include <math.h> | |
#include <random> | |
#include <climits> | |
#include <memory> | |
#include <utility> | |
#include <queue> | |
#include <optional> | |
#include <boost/bind/bind.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/scoped_ptr.hpp> | |
#include <boost/enable_shared_from_this.hpp> | |
#include <boost/make_shared.hpp> | |
#include "boost/date_time/posix_time/posix_time_types.hpp" | |
#include <boost/thread/mutex.hpp> | |
#include <boost/functional.hpp> | |
#include <boost/function.hpp> | |
#include <boost/thread.hpp> | |
#include <boost/signals2.hpp> | |
//Function pointer called CallbackType that takes a float | |
//and returns an int | |
//typedef int (*CallbackType)(float); | |
template<typename T> | |
class Connection; | |
template<typename T> | |
struct Message; | |
template<typename T> | |
struct OwnedMessage; | |
//template<typename T> | |
//using ClientDisconnectCallbackType = void (*)(boost::shared_ptr<Connection<T>>); | |
template<typename T> | |
using ClientDisconnectCallbackType = boost::function<void(boost::shared_ptr<Connection<T>>)>; | |
template<typename T> | |
using ClientMessageCallbackType = boost::function<void(OwnedMessage<T>&)>; | |
template<typename T> | |
using ClientMessageSentCallbackType = boost::function<void(OwnedMessage<T>*)>; | |
#include "MessageHeader.h" | |
#include "Message.h" | |
#include "OwnedMessage.h" | |
#include "ThreadSafeQueue.h" | |
#include "Connection.h" | |
#include "Server.h" | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Server-Test.cpp : This file contains the 'main' function. Program execution begins and ends there. | |
// | |
#include "prerequisites.h" | |
boost::asio::io_context context; | |
int messageCount; | |
boost::thread_group tg; | |
enum class MessageTypes : uint32_t | |
{ | |
ServerAccept, | |
ServerDeny, | |
ServerPing, | |
MessageAll, | |
SendText, | |
ServerMessage, | |
ServerMessage1, | |
ServerMessage2, | |
ServerMessage3, | |
ServerMessage4, | |
ServerMessage5, | |
ServerMessage6, | |
ServerMessage7, | |
ServerMessage8, | |
ServerMessage9, | |
}; | |
class MyServer : public Server<MessageTypes> | |
{ | |
public: | |
MyServer(boost::asio::io_context& context, boost::asio::ip::tcp::endpoint ep) : Server<MessageTypes>(context,ep) | |
{ | |
} | |
virtual void OnClientDisconnect(boost::shared_ptr<Connection<MessageTypes>> client) | |
{ | |
std::cout << "[ Client " << client->GetId() << " ] Disconnected" << std::endl; | |
} | |
virtual bool OnClientConnect(boost::shared_ptr<Connection<MessageTypes>> client) | |
{ | |
std::cout << "[ Client " << client->GetId() << " ] Connected" << std::endl; | |
Message<MessageTypes> msg; | |
msg.message_header.id = MessageTypes::ServerAccept; | |
msg << client->GetId(); | |
client->Send(msg); | |
return true; | |
} | |
virtual void OnMessage(OwnedMessage<MessageTypes>& msg) | |
{ | |
std::cout << "[ Client " << msg.remote->GetId() << " ] "; | |
if (msg.msg.message_header.id == MessageTypes::SendText) | |
{ | |
std::string message= msg.msg.GetString(0); | |
std::cout << " Received Message" << std::endl; | |
message = ""; | |
msg.remote->Send(msg.msg);//fire it back to the client | |
} | |
} | |
virtual void OnMessageSent(OwnedMessage<MessageTypes>* msg) | |
{ | |
//std::cout << "[ Client " << msg->remote->GetId() << " ] "; | |
//std::cout << " Sent Message" << std::endl; | |
} | |
}; | |
MyServer* srv; | |
void RunAsioContext() | |
{ | |
std::cout << "IO Thread Starting - Id: " << boost::this_thread::get_id() << std::endl; | |
context.run(); | |
std::cout << "IO Thread finished - Id: " << boost::this_thread::get_id() << std::endl; | |
} | |
long long previous_time = 0; | |
long long highest_time = 0; | |
int total_thread_count = 0; | |
size_t max_thread_count = 1; | |
boost::asio::io_service io_service; | |
boost::posix_time::seconds interval(1); | |
boost::posix_time::seconds interval2(5); | |
boost::asio::deadline_timer timer(io_service, interval); | |
volatile bool stop = false; | |
void timedBcast(const boost::system::error_code& e) | |
{ | |
//std::cout << "Beginning BCAST..." << std::endl; | |
std::chrono::high_resolution_clock::time_point t2 = | |
std::chrono::high_resolution_clock::now(); | |
//while (!context.stopped() && !io_service.stopped() && !e && !stop) | |
if (!context.stopped() && !io_service.stopped() && !e && !stop) | |
{ | |
if (srv != nullptr) | |
{ | |
if (messageCount >= 1000) | |
{ | |
messageCount = 0; | |
} | |
//std::cout << "SENDING BCAST" << std::endl; | |
//std::string message = "HELLO WORLD TO ALL BROADCAST! "; | |
//message += std::to_string(messageCount++); | |
int msg_sz = rand() % 102400 + 81920; | |
std::string message = std::string(msg_sz, 'a'); | |
message += " "; | |
message += std::to_string(messageCount++); | |
Message<MessageTypes> msg; | |
msg.message_header.id = MessageTypes::MessageAll; | |
msg.Append(message); | |
msg.TransactionId = "Broadcast"; | |
std::chrono::high_resolution_clock::time_point t1 = | |
std::chrono::high_resolution_clock::now(); | |
srv->BroadcastMessage(msg); | |
long long time = std::chrono::duration_cast<std::chrono::milliseconds>( | |
std::chrono::high_resolution_clock::now() - t1).count(); | |
long long time2 = std::chrono::duration_cast<std::chrono::milliseconds>( | |
std::chrono::high_resolution_clock::now() - t2).count(); | |
if (time != previous_time && time > 6) | |
{ | |
//timer += time - 6; | |
std::cout << "Broadcast took " << time << "ms | " << time2 << "ms" << std::endl; | |
previous_time = time; | |
} | |
int time_expire = 100 - time2 - 1; | |
if (time_expire <= 5) | |
{ | |
time_expire = 50; | |
} | |
// Reschedule the timer for 1 second in the future: | |
timer.expires_from_now(boost::posix_time::milliseconds(time_expire)); | |
// Posts the timer event | |
timer.async_wait(timedBcast); | |
std::cout << "BROADCAST" << std::endl; | |
} | |
} | |
//std::cout << "Exited bcast" << std::endl; | |
//timer.cancel(); | |
} | |
void monitorBuffer() | |
{ | |
int delay_ms = 1000; | |
while (!context.stopped()) | |
{ | |
boost::this_thread::sleep_for(boost::chrono::milliseconds(delay_ms)); | |
if (srv != nullptr) | |
{ | |
auto avg = srv->CalculateAverageBacklog(); | |
int num_threads = avg / 5 + avg % 5; | |
for (int i = 0; i < num_threads; i++) | |
{ | |
tg.create_thread(&RunAsioContext); | |
} | |
if (num_threads > 0) | |
{ | |
std::cout << "ADDING MORE THREADS " << num_threads; | |
} | |
} | |
} | |
} | |
int main() | |
{ | |
messageCount = 1; | |
context.reset(); | |
io_service.reset(); | |
srand(time(NULL)); | |
boost::asio::ip::tcp::endpoint ep = boost::asio::ip::tcp::endpoint(boost::asio::ip::address_v4::loopback(), 40000); | |
srv = new MyServer(context, ep); | |
for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i) | |
tg.create_thread(&RunAsioContext); | |
std::cout << "Hello World!\n"; | |
timer.expires_from_now(boost::posix_time::milliseconds(5000)); | |
timer.async_wait(timedBcast); | |
boost::thread t1([] {io_service.run(); }); | |
t1.detach(); | |
//boost::thread t2(&monitorBuffer); | |
//t2.detach(); | |
std::string str; | |
std::getline(std::cin, str); | |
srv->interrupt(); | |
//t1.interrupt(); | |
stop = true; | |
timer.cancel(); | |
io_service.stop(); | |
timer.cancel(); | |
context.stop(); | |
tg.join_all(); | |
if (t1.joinable()) | |
{ | |
t1.join(); | |
} | |
//if (t2.joinable()) | |
//{ | |
// t2.join(); | |
//} | |
str=""; | |
std::getline(std::cin, str); | |
delete srv; | |
return 0; | |
} | |
// Run program: Ctrl + F5 or Debug > Start Without Debugging menu | |
// Debug program: F5 or Debug > Start Debugging menu | |
// Tips for Getting Started: | |
// 1. Use the Solution Explorer window to add/manage files | |
// 2. Use the Team Explorer window to connect to source control | |
// 3. Use the Output window to see build output and other messages | |
// 4. Use the Error List window to view errors | |
// 5. Go to Project > Add New Item to create new code files, or Project > Add Existing Item to add existing code files to the project | |
// 6. In the future, to open this project again, go to File > Open > Project and select the .sln file |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename T> | |
class Server | |
{ | |
public: | |
Server(boost::asio::io_context& io_context, boost::asio::ip::tcp::endpoint endpoint) :io_context_(io_context), acceptor_(io_context) | |
{ | |
this->connectionIds = 10000; | |
this->acceptor_.open(endpoint.protocol()); | |
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); | |
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::do_not_route(true)); | |
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::keep_alive(false)); | |
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::enable_connection_aborted(false)); | |
this->acceptor_.set_option(boost::asio::ip::tcp::acceptor::linger(false, 3)); | |
this->acceptor_.bind(endpoint); | |
this->acceptor_.listen(); | |
this->shutdownBeginning = false; | |
this->shutdownCompleted = false; | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
this->connections = std::map<int,boost::shared_ptr<Connection<T>>>(); | |
this->connectionsToRemove = std::vector<int>(); | |
conMutex.unlock(); | |
//this->sendingThread = std::thread(&Server::processMsgs, this); | |
this->start_accept(); | |
} | |
void interrupt() { | |
this->shutdownBeginning = true; | |
//this->messageVar.notify_one(); | |
this->acceptor_.cancel(); | |
this->acceptor_.close(); | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
for (const auto& [key, value] : this->connections) { | |
value->Disconnect(true,true,true); | |
} | |
this->connections.clear(); | |
conMutex.unlock(); | |
this->shutdownCompleted = true; | |
} | |
unsigned long long CalculateAverageBacklog() | |
{ | |
size_t total = 0; | |
size_t count = 0; | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
for (const auto& [key, value] : this->connections) | |
{ | |
if (!value->IsInvalid()) | |
{ | |
size_t backlog = value->GetSendBacklog(); | |
total += backlog; | |
count++; | |
} | |
} | |
conMutex.unlock(); | |
if (count > 0) | |
{ | |
auto average = total / count; | |
return average; | |
} | |
return 0; | |
} | |
void BroadcastMessage(Message<T>& msg) | |
{ | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
/*for (const auto& [key, value] : this->connections) | |
{ | |
if (!value->IsInvalid()) | |
{ | |
value->Send(msg); | |
} | |
}*/ | |
std::for_each( | |
std::execution::par_unseq, | |
this->connections.begin(), | |
this->connections.end(), | |
[msg](auto&& item) | |
{ | |
item.second->Send(msg); | |
}); | |
conMutex.unlock(); | |
//this->bcast(msg); | |
} | |
~Server() {} | |
bool IsShutDownCompleted() { | |
return this->shutdownCompleted; | |
} | |
protected: | |
// This server class should override thse functions to implement | |
// customised functionality | |
// Called when a client connects, you can veto the connection by returning false | |
virtual bool OnClientConnect(boost::shared_ptr<Connection<T>> client) | |
{ | |
return false; | |
} | |
// Called when a client appears to have disconnected | |
virtual void OnClientDisconnect(boost::shared_ptr<Connection<T>> client) | |
{ | |
} | |
// Called when a message arrives | |
virtual void OnMessage(OwnedMessage<T>& message) | |
{ | |
} | |
virtual void OnMessageSent(OwnedMessage<T>* message) | |
{ | |
} | |
private: | |
void client_disconnected(boost::shared_ptr<Connection<T>> connection) | |
{ | |
OnClientDisconnect(connection); | |
removeConnection(connection); | |
} | |
void client_message(OwnedMessage<T>& message) | |
{ | |
OnMessage(message); | |
} | |
void message_sent(OwnedMessage<T>* message) | |
{ | |
OnMessageSent(message); | |
} | |
void start_accept() { | |
if (!this->shutdownBeginning && this->acceptor_.is_open()) | |
{ | |
int id = this->connectionIds++; | |
boost::shared_ptr<Connection<T>> new_connection = | |
Connection<T>::create(this->io_context_, id, boost::bind(&Server::client_message, this, boost::placeholders::_1), boost::bind(&Server::message_sent, this, boost::placeholders::_1), boost::bind(&Server::client_disconnected, this, boost::placeholders::_1), qMessagesIn); | |
this->acceptor_.async_accept(new_connection->socket(), | |
boost::bind(&Server::handle_accept, this, new_connection, | |
boost::asio::placeholders::error)); | |
} | |
} | |
#pragma region Handle the Connection Acceptance | |
void handle_accept(boost::shared_ptr<Connection<T>> new_connection, | |
const boost::system::error_code& error) { | |
if (!error && !this->shutdownBeginning) | |
{ | |
this->start_accept(); | |
if (OnClientConnect(new_connection)) | |
{ | |
boost::signals2::connection c = this->bcast.connect(boost::bind(&Connection<T>::Send, new_connection, boost::placeholders::_1)); | |
new_connection->accepted(c); | |
this->addConnection(std::move(new_connection)); | |
} | |
else | |
{ | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::cout << "[ Client " << new_connection->GetId() << " ] Connection Denied." << std::endl; | |
#endif | |
} | |
} | |
else if (!this->shutdownBeginning) | |
{ | |
this->start_accept(); | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::cout << "[ SERVER ] New connection error: " << error.message() << std::endl; | |
#endif | |
} | |
} | |
#pragma endregion | |
#pragma region Add Connection | |
void addConnection(boost::shared_ptr<Connection<T>> connection) | |
{ | |
if (!this->shutdownBeginning) | |
{ | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
this->connections.insert(std::make_pair(connection->GetId(),std::move(connection))); | |
conMutex.unlock(); | |
} | |
} | |
#pragma endregion | |
#pragma region Remove Connection | |
void removeConnectionById(int id) | |
{ | |
if (!this->shutdownBeginning) | |
{ | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
this->connections.erase(id); | |
conMutex.unlock(); | |
} | |
} | |
void removeConnection(boost::shared_ptr<Connection<T>> connection) | |
{ | |
if (!this->shutdownBeginning) | |
{ | |
int id = connection->GetId(); | |
std::unique_lock<std::mutex> conMutex(this->connectionMutex); | |
this->connections.erase(id); | |
conMutex.unlock(); | |
} | |
} | |
#pragma endregion | |
boost::asio::io_context& io_context_; | |
boost::asio::ip::tcp::acceptor acceptor_; | |
std::mutex connectionMutex; | |
std::map<int,boost::shared_ptr<Connection<T>>> connections; | |
std::vector<int> connectionsToRemove; | |
volatile bool shutdownBeginning; | |
volatile bool shutdownCompleted; | |
int connectionIds; | |
ThreadSafeQueue<OwnedMessage<T>> qMessagesIn; | |
boost::signals2::signal<void(Message<T>&)> bcast; | |
}; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template<typename T> | |
class ThreadSafeQueue | |
{ | |
public: | |
ThreadSafeQueue() = default; | |
ThreadSafeQueue(const ThreadSafeQueue<T>&) = delete; | |
ThreadSafeQueue& operator=(const ThreadSafeQueue<T>&) = delete; | |
virtual ~ThreadSafeQueue() { clear(); } | |
// Returns and maintains item at front of Queue | |
const T& front() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.front(); | |
} | |
// Returns and maintains item at back of Queue | |
const T& back() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.back(); | |
} | |
// Removes and returns item from front of Queue | |
T pop_front() | |
{ | |
std::scoped_lock lock(muxQueue); | |
auto t = std::move(deqQueue.front()); | |
deqQueue.pop_front(); | |
return t; | |
} | |
// Removes and returns item from back of Queue | |
T pop_back() | |
{ | |
std::scoped_lock lock(muxQueue); | |
auto t = std::move(deqQueue.back()); | |
deqQueue.pop_back(); | |
return t; | |
} | |
// Adds an item to back of Queue | |
void push_back(const T& item) | |
{ std::scoped_lock lock(muxQueue); | |
deqQueue.emplace_back(std::move(item)); | |
} | |
// Adds an item to front of Queue | |
void push_front(const T& item) | |
{ | |
std::scoped_lock lock(muxQueue); | |
deqQueue.emplace_front(std::move(item)); | |
} | |
// Returns true if Queue has no items | |
bool empty() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.empty(); | |
} | |
// Returns number of items in Queue | |
size_t count() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.size(); | |
} | |
// Clears Queue | |
void clear() | |
{ | |
std::scoped_lock lock(muxQueue); | |
deqQueue.clear(); | |
} | |
protected: | |
std::mutex muxQueue; | |
std::deque<T> deqQueue; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment