-
-
Save jammerxd/944692e52ffae328c5d791f9fc879477 to your computer and use it in GitHub Desktop.
Boost Asio Client Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Client-Test.cpp : This file contains the 'main' function. Program execution begins and ends there. | |
// | |
#include "prerequisites.h" | |
enum class MessageTypes : uint32_t | |
{ | |
ServerAccept, | |
ServerDeny, | |
ServerPing, | |
MessageAll, | |
SendText, | |
ServerMessage, | |
ServerMessage1, | |
ServerMessage2, | |
ServerMessage3, | |
ServerMessage4, | |
ServerMessage5, | |
ServerMessage6, | |
ServerMessage7, | |
ServerMessage8, | |
ServerMessage9, | |
}; | |
std::string ReadLine() | |
{ | |
std::string str = ""; | |
std::getline(std::cin, str); | |
return str; | |
} | |
/*std::condition_variable cv; | |
std::mutex mutex; | |
std::mutex exitMutex; | |
std::condition_variable ecv; | |
int wait_time, num_msgs, msg_size, delay_between_msgs; | |
std::string ReadLine() | |
{ | |
std::string str = ""; | |
std::getline(std::cin, str); | |
return str; | |
} | |
class MyClient : public Client<MessageTypes> | |
{ | |
public: | |
MyClient() : Client<MessageTypes>() | |
{ | |
isExiting = false; | |
isFirst = true; | |
} | |
virtual void OnDisconnect(boost::shared_ptr<Connection<MessageTypes>> client) { | |
std::stringstream ss; | |
ss << "Disconnected."; | |
std::cout << ss.str() << std::endl; | |
isExiting = true; | |
cv.notify_all(); | |
} | |
virtual void OnMessage(OwnedMessage<MessageTypes>& message) { | |
if (message.msg.message_header.id == MessageTypes::ServerAccept) | |
{ | |
int id; | |
message.msg >> id; | |
this->SetId(id); | |
std::stringstream ss; | |
ss << "[ SERVER ] SENT ID: " << id; | |
std::cout << ss.str() << std::endl; | |
ss.str(""); | |
t1 = boost::thread(&MyClient::SendMessages, this); | |
t1.detach(); | |
//cv.notify_all(); | |
} | |
else if (message.msg.message_header.id == MessageTypes::SendText) | |
{ | |
std::stringstream ss; | |
ss << "[ SERVER ] SENT MESSAGE " << message.msg.size() << " bytes long."; | |
std::cout << ss.str() << std::endl; | |
ss.str(""); | |
} | |
else if (message.msg.message_header.id == MessageTypes::MessageAll) | |
{ | |
std::chrono::high_resolution_clock::time_point now = std::chrono::high_resolution_clock::now(); | |
time_taken = std::chrono::duration_cast<std::chrono::milliseconds>(now - time_point).count(); | |
std::string s = message.msg.GetString(0); | |
//std::stringstream ss; | |
//std::string ms = s.substr(s.find_last_of(' ')); | |
//ss << "[ SERVER ] BROADCASTED MESSAGE " << ms; | |
//std::cout << ss.str() << std::endl; | |
//ss.str(""); | |
//s = ""; | |
if (isFirst) | |
{ | |
isFirst = false; | |
} | |
else | |
{ | |
if (time_taken > 55) | |
{ | |
std::cout << "MESSAGE WAS DELAYED (" << std::to_string(time_taken) << "ms)" << std::endl; | |
} | |
} | |
time_point = std::chrono::high_resolution_clock::now(); | |
} | |
} | |
virtual void OnConnect() | |
{ | |
std::cout << "[ DEBUG ] Thread Id: " << std::this_thread::get_id() << std::endl; | |
} | |
virtual void OnMessageSent(Message<MessageTypes>& msg) | |
{ | |
std::cout << "Message sent" << std::endl; | |
cv.notify_all(); | |
} | |
std::chrono::high_resolution_clock::time_point time_point; | |
long long time_taken=0; | |
bool isFirst; | |
boost::thread t1; | |
std::condition_variable cv; | |
std::mutex mutex; | |
bool isExiting; | |
~MyClient() { | |
isExiting = true; | |
Disconnect(); | |
t1.interrupt(); | |
if (t1.joinable()) | |
{ | |
std::cout << "waiting for join" << std::endl; | |
t1.join(); | |
} | |
} | |
private: | |
void SendMessages() | |
{ | |
srand(time(NULL)); | |
num_msgs = rand() % 10 + 1; | |
wait_time = rand() % 30 + 1; | |
num_msgs = 0; | |
char letters[10] = { 'a','b','c','d','e','f','g','h','i','j' }; | |
for (int i = 0; i < num_msgs; i++) | |
{ | |
if (this->IsConnected()) | |
{ | |
msg_size = rand() % 512000 + 409600; | |
delay_between_msgs = rand() % 10 + 1; | |
OwnedMessage<MessageTypes> msg; | |
msg.msg.message_header.id = MessageTypes::SendText; | |
std::string data = std::string(msg_size, letters[i]); | |
msg.msg.Append(data); | |
std::cout << "Sending " << i + 1 << " of " << num_msgs << " | Thread Id: " << std::this_thread::get_id() << std::endl; | |
boost::thread thread2(&MyClient::Send, this, msg.msg); | |
thread2.detach(); | |
std::cout << "Sent " << i + 1 << " of " << num_msgs << " | Using thread Id: " << thread2.get_id() << std::endl; | |
data = ""; | |
std::cout << "Sleeping for " << delay_between_msgs * 1000 << std::endl; | |
} | |
else | |
{ | |
std::cout << "Not connected. Exiting thread." << std::endl; | |
cv.notify_all(); | |
ecv.notify_all(); | |
return; | |
} | |
if (this->IsConnected()) | |
{ | |
boost::this_thread::sleep_for(boost::chrono::milliseconds(delay_between_msgs * 1000)); | |
} | |
else | |
{ | |
std::cout << "Not connected. Exiting thread." << std::endl; | |
cv.notify_all(); | |
ecv.notify_all(); | |
return; | |
} | |
} | |
ecv.notify_all(); | |
} | |
}; | |
int main() | |
{ | |
wait_time = 0; | |
num_msgs = 0; | |
msg_size = 0; | |
std::string host = "localhost"; | |
uint16_t port = 40000; | |
MyClient c; | |
//ReadLine(); | |
std::cout << "Connecting..." << std::endl; | |
c.Connect(host, port); | |
//std::unique_lock<std::mutex> lock(mutex); | |
//cv.wait(lock); | |
//Sleep(wait_time * 1000); | |
ReadLine(); | |
//std::unique_lock<std::mutex> lock(exitMutex); | |
//ecv.wait(lock); | |
c.Disconnect(); | |
return 0; | |
} | |
std::vector<MyClient*> clients; | |
int main() | |
{ | |
std::cout << "Press enter to begin..." << std::endl; | |
ReadLine(); | |
std::string host = "localhost"; | |
uint16_t port = 40000; | |
for (int i = 0; i < 50; i++) | |
{ | |
MyClient* c =new MyClient(); | |
c->Connect(host, port); | |
clients.push_back(c); | |
} | |
ReadLine(); | |
std::for_each( | |
std::execution::par_unseq, | |
clients.begin(), | |
clients.end(), | |
[](auto&& item) | |
{ | |
item->Disconnect(); | |
delete item; | |
}); | |
std::cout << "DONE" << std::endl; | |
return 0; | |
} | |
*/ | |
std::atomic_bool stop = false; | |
std::chrono::high_resolution_clock::duration previous_time; | |
boost::asio::thread_pool ctx; | |
int messageCount; | |
boost::asio::high_resolution_timer timer(ctx); | |
class MyClient : public Client<MessageTypes,boost::asio::any_io_executor> | |
{ | |
public: | |
MyClient(boost::asio::any_io_executor executor) : Client<MessageTypes, boost::asio::any_io_executor>(executor) | |
{ | |
} | |
void OnDisconnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
} | |
void OnMessage(MsgPtr<MessageTypes> const& message, ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
if (message->message_header.id == MessageTypes::ServerAccept) | |
{ | |
int id = message->GetInt(0); | |
SetId(id); | |
std::cout << "Received ID " << std::to_string(id); | |
} | |
} | |
void OnConnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
} | |
void OnMessageSent(MsgPtr<MessageTypes> const& message, ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
} | |
~MyClient() { | |
Disconnect(); | |
} | |
}; | |
int main() | |
{ | |
MyClient* c; | |
c = new MyClient(ctx.get_executor()); | |
c->Connect("127.0.0.1", 40000); | |
ReadLine(); | |
c->Disconnect(); | |
delete c; | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename MsgId,typename Executor> | |
class Client | |
{ | |
protected: | |
virtual void OnDisconnect(ConnPtr<MsgId, Executor> const& client) { | |
} | |
virtual void OnMessage(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client) { | |
} | |
virtual void OnConnect(ConnPtr<MsgId, Executor> const& client) | |
{ | |
} | |
virtual void OnMessageSent(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client ) | |
{ | |
} | |
public: | |
bool Connect(const std::string& host, const uint16_t port) | |
{ | |
if (IsConnected()) | |
{ | |
Disconnect(); | |
} | |
try | |
{ | |
// Resolve hostname/ip-address into tangiable physical address | |
boost::asio::ip::tcp::resolver resolver(executor_); | |
boost::asio::ip::tcp::resolver::results_type endpoints = resolver.resolve(host, std::to_string(port)); | |
// Create connection | |
connection = conn_t<MsgId, Executor>::create( | |
executor_, 0, | |
boost::bind(&Client<MsgId, Executor>::client_message, this, boost::placeholders::_1, boost::placeholders::_2), | |
boost::bind(&Client<MsgId, Executor>::message_sent, this, boost::placeholders::_1, boost::placeholders::_2), | |
boost::bind(&Client<MsgId, Executor>::client_disconnected, this, boost::placeholders::_1) | |
); | |
// Tell the connection object to connect to server | |
//this->connection->ConnectToServer(endpoints); | |
boost::asio::async_connect(connection->socket(), endpoints, | |
[this](std::error_code ec, boost::asio::ip::tcp::endpoint endpoint) | |
{ | |
if (!ec) | |
{ | |
connection->accepted(); | |
OnConnect(connection); | |
} | |
}); | |
} | |
catch (std::exception& e) | |
{ | |
std::cerr << "Client Exception: " << e.what() << std::endl; | |
return false; | |
} | |
return true; | |
} | |
void Disconnect() | |
{ | |
if (IsConnected()) | |
{ | |
connection->Disconnect(true,true,true); | |
} | |
} | |
~Client() | |
{ | |
Disconnect(); | |
} | |
Client(Executor executor):executor_(executor){ | |
} | |
bool IsConnected() | |
{ | |
return !connection->IsInvalid(); | |
} | |
void Send(MsgPtr<MsgId> msg) | |
{ | |
if (IsConnected()) | |
{ | |
connection->Send(std::move(msg)); | |
} | |
} | |
void client_disconnected(ConnPtr<MsgId, Executor> const& client) | |
{ | |
OnDisconnect(client); | |
} | |
void client_message(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client) | |
{ | |
OnMessage(message,client); | |
} | |
void message_sent(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client) | |
{ | |
OnMessageSent(message,client); | |
} | |
void SetId(int id) | |
{ | |
connection->SetId(id); | |
} | |
private: | |
ConnPtr<MsgId, Executor> connection; | |
Executor executor_; | |
strand_t<Executor> strand_ = boost::asio::make_strand(executor_); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename MsgId,typename Executor> | |
class Connection : public std::enable_shared_from_this<Connection<MsgId,Executor>> | |
{ | |
public: | |
static ConnPtr<MsgId, Executor> create( | |
Executor executor,int id, | |
ClientMessageCallbackType<MsgId,Executor> message_handle, | |
ClientMessageSentCallbackType<MsgId,Executor> messagesent_handle, | |
ClientDisconnectCallbackType<MsgId,Executor> disconnect_handle) { | |
return ConnPtr<MsgId, Executor>( | |
new conn_t<MsgId,Executor>(executor,id,message_handle,messagesent_handle,disconnect_handle)); | |
} | |
boost::asio::ip::tcp::socket& socket() { | |
return socket_; | |
} | |
void accepted() { | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Connected."; | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
boost::asio::ip::tcp::no_delay option(true); | |
socket_.set_option(option); | |
ReadHeader(); | |
} | |
void SetId(int id) | |
{ | |
boost::asio::post(socket_.get_executor(), [this,id] { | |
connectionId = id; | |
}); | |
} | |
int GetId() { | |
return connectionId; | |
} | |
void Disconnect(bool cancel,bool shutdown, bool close) { | |
if (socket_.is_open() && !alreadyDisconnected.exchange(true)) | |
{ | |
this->alreadyDisconnected = true; | |
boost::system::error_code ec; | |
try | |
{ | |
if (cancel) | |
socket_.cancel(); | |
} | |
catch (const std::exception& ex) | |
{ | |
std::cout << "[ Client " << GetId() << " ] Cancel Exception: " << ex.what() << std::endl; | |
} | |
qMessagesOut.clear(); | |
try | |
{ | |
if (shutdown) | |
socket_.shutdown(boost::asio::socket_base::shutdown_both, ec); | |
if (ec) | |
{ | |
std::cout << " [ Client " <<GetId() << " ] Shutdown Error: " << ec.value() << " - " << ec.message(); | |
} | |
} | |
catch (const std::exception& ex) | |
{ | |
std::cout << "[ Client " << GetId() << " ] Shutdown Exception: " << ex.what() << std::endl; | |
} | |
try | |
{ | |
if (close && !ec) | |
socket_.close(); | |
} | |
catch (const std::exception& ex) | |
{ | |
std::cout << "[ Client " << GetId() << " ] Close Exception: " << ex.what() << std::endl; | |
} | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::stringstream ss; | |
ss << "[ Client " << this->GetId() << " ] " << "Disconnected."; | |
std::string output = ss.str(); | |
std::cout << output << std::endl; | |
output = ""; | |
ss.str(""); | |
#endif | |
if (disconnect_handler != nullptr) | |
{ | |
disconnect_handler(this->shared_from_this()); | |
} | |
} | |
invalidState = true; | |
} | |
bool IsInvalid() const { | |
return !socket_.is_open() || invalidState; | |
} | |
void Send(MsgPtr<MsgId> msg) | |
{ | |
qMessagesOut.push_back(std::move(msg)); | |
if (qMessagesOut.count() == 1) | |
{ | |
// TODO FIXME Race condition? | |
WriteMessage(); | |
} | |
} | |
~Connection() { | |
Disconnect(true,true,true); | |
} | |
Connection(Executor executor, int id, | |
ClientMessageCallbackType<MsgId, Executor> message_handle, | |
ClientMessageSentCallbackType<MsgId, Executor> messagesent_handle, | |
ClientDisconnectCallbackType<MsgId, Executor> disconnect_handle) : | |
connectionId(id), | |
disconnect_handler(disconnect_handle), | |
message_handler(message_handle), | |
messagesent_handler(messagesent_handle), | |
socket_(executor) | |
{ | |
} | |
private: | |
ClientDisconnectCallbackType<MsgId, Executor> disconnect_handler; | |
ClientMessageCallbackType<MsgId, Executor> message_handler; | |
ClientMessageSentCallbackType<MsgId, Executor> messagesent_handler; | |
bool Report([[maybe_unused]] std::string_view caption, std::error_code ec | |
//,[[maybe_unused]] auto&&... what | |
) | |
{ | |
if (ec) { | |
#ifdef VERBOSE_SERVER_DEBUG | |
std::cout << "[ Client " << GetId() << " ] " << caption | |
<< " Failed" | |
<< ec.value() << " - " << ec.message() | |
<< std::endl; | |
#endif | |
invalidState = true; | |
Disconnect(true, true, true); | |
return false; | |
} | |
//print_arg(what) | |
#ifdef VERBOSE_SERVER_DEBUG | |
else | |
{ | |
[[maybe_unused]] auto print_arg = [](auto&& v) { | |
std::cout << " " << v; | |
return std::cout.good(); | |
}; | |
std::cout << "[ Client " << GetId() << " ] " << caption << " Success"; | |
std::cout << " " << ec.value() << " - " << ec.message(); | |
if ((true && ... && print_arg(what))) | |
std::cout << std::endl; | |
} | |
#endif | |
return true; | |
} | |
void ReadHeader() { | |
auto self = this->shared_from_this(); | |
boost::asio::async_read(socket_, boost::asio::buffer(&tempInMsg.message_header, sizeof(tempInMsg.message_header)), | |
[this, self = this->shared_from_this()](std::error_code ec, std::size_t length) | |
{ | |
if (Report("Read Header", ec)) { | |
tempInMsg.body.resize(tempInMsg.message_header.size); | |
ReadBody(); | |
} | |
}); | |
} | |
void ReadBody() { | |
boost::asio::async_read(socket_, boost::asio::buffer(tempInMsg.body), | |
[this, self= this->shared_from_this()](std::error_code ec, std::size_t length) | |
{ | |
//if (Report("Read Body", ec, tempInMsg.body.data())) { | |
if (Report("Read Body", ec)) { | |
CommitIncoming(); | |
// Go Back to waiting for header | |
ReadHeader(); | |
} | |
}); | |
} | |
void WriteMessage() | |
{ | |
if (!qMessagesOut.empty()) | |
{ | |
auto message = qMessagesOut.pop_front(); | |
std::vector buffers | |
{ | |
boost::asio::buffer(&message->message_header, sizeof(MessageHeader<MsgId>)) , | |
boost::asio::buffer(message->body) | |
}; | |
boost::asio::async_write(socket_, buffers, | |
[this, self= this->shared_from_this(),message](std::error_code ec, std::size_t length) mutable | |
{ | |
//if (Report("WriteMessage", ec, "Wrote", length, "bytes")) { | |
if(Report("WriteMeessage",ec)) | |
{ | |
if (messagesent_handler) | |
{ | |
messagesent_handler(std::move(message), this->shared_from_this()); | |
} | |
if (!qMessagesOut.empty()) { | |
// TODO FIXME Race condition? | |
WriteMessage(); | |
} | |
} | |
}); | |
} | |
} | |
void CommitIncoming() | |
{ | |
if (message_handler) | |
{ | |
message_handler(std::make_shared<Message<MsgId>>(std::move(tempInMsg)), this->shared_from_this()); | |
} | |
tempInMsg.body.clear(); | |
tempInMsg.message_header.size = 0; | |
} | |
socket_t<Executor> socket_; | |
int connectionId; | |
Message<MsgId> tempInMsg; | |
Message<MsgId> tempOutMsg; | |
ThreadSafeQueue<MsgPtr<MsgId>> qMessagesOut; | |
std::atomic_bool invalidState = false; | |
std::atomic_bool alreadyDisconnected = false; | |
}; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename MsgID> | |
struct Message | |
{ | |
[[nodiscard]] size_t size() const { return body.size(); } | |
MessageHeader<MsgID> message_header{}; | |
std::vector<unsigned char> body; | |
std::string TransactionId; | |
void Append(const char* data) | |
{ | |
size_t i = body.size(); | |
size_t data_length = strlen(data); | |
body.resize(body.size() + sizeof(size_t) + data_length); | |
std::memcpy(body.data() + i, &data_length, sizeof(size_t)); | |
std::memcpy(body.data() + sizeof(size_t) + i, data, data_length); | |
message_header.size = (uint32_t)size(); | |
} | |
void Append(std::string const& data) | |
{ | |
const char* data_cStr = data.c_str(); | |
Append(data_cStr); | |
} | |
void GetString(size_t offset, std::string const& dst) const | |
{ | |
if (body.size() >= offset + sizeof(size_t)) | |
{ | |
size_t length; | |
std::memcpy(&length, body.data() + offset, sizeof(size_t)); | |
if (body.size() >= offset + sizeof(size_t) + length) | |
{ | |
dst.assign(length, body[sizeof(size_t) + offset]); | |
return; | |
} | |
} | |
std::string error = "ERROR"; | |
dst.assign(error); | |
} | |
std::string GetString(size_t offset) const | |
{ | |
if (body.size() >= offset + sizeof(size_t)) | |
{ | |
size_t length; | |
std::memcpy(&length, body.data() + offset, sizeof(size_t)); | |
if (body.size() >= offset + sizeof(size_t) + length) | |
{ | |
std::string s = std::string(body.begin() + offset + sizeof(size_t), body.begin() + offset + sizeof(size_t) + length); | |
return s; | |
} | |
} | |
std::string error = "ERROR"; | |
return error; | |
} | |
int GetInt(size_t offset) const | |
{ | |
int myValue; | |
std::memcpy(body.data() + offset, myValue, sizeof(int)); | |
return myValue; | |
} | |
// Override for std::cout compatibility - produces friendly description of message | |
friend std::ostream& operator << (std::ostream& os, const Message<MsgID>& msg) | |
{ | |
os << "ID:" << int(msg.message_header.id) << " Size:" << msg.message_header.size; | |
return os; | |
} | |
// Convenience Operator overloads - These allow us to add and remove stuff from | |
// the body vector as if it were a stack, so First in, Last Out. These are a | |
// template in itself, because we dont know what data type the user is pushing or | |
// popping, so lets allow them all. NOTE: It assumes the data type is fundamentally | |
// Plain Old Data (POD). TLDR: Serialise & Deserialise into/from a vector | |
// Pushes any POD-like data into the message buffer | |
template<typename DataType> | |
friend Message<MsgID>& operator << (Message<MsgID>& msg, const DataType& data) | |
{ | |
// Check that the type of the data being pushed is trivially copyable | |
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pushed into vector"); | |
// Cache current size of vector, as this will be the point we insert the data | |
size_t i = msg.body.size(); | |
// Resize the vector by the size of the data being pushed | |
msg.body.resize(msg.body.size() + sizeof(DataType)); | |
// Physically copy the data into the newly allocated vector space | |
std::memcpy(msg.body.data() + i, &data, sizeof(DataType)); | |
// Recalculate the message size | |
msg.message_header.size = (uint32_t)msg.size(); | |
// Return the target message so it can be "chained" | |
return msg; | |
} | |
// Pulls any POD-like data form the message buffer | |
template<typename DataType> | |
friend Message<MsgID>& operator >> (Message<MsgID>& msg, DataType& data) | |
{ | |
// Check that the type of the data being pushed is trivially copyable | |
static_assert(std::is_standard_layout<DataType>::value, "Data is too complex to be pulled from vector"); | |
// Cache the location towards the end of the vector where the pulled data starts | |
size_t i = msg.body.size() - sizeof(DataType); | |
// Physically copy the data from the vector into the user variable | |
std::memcpy(&data, msg.body.data() + i, sizeof(DataType)); | |
// Shrink the vector to remove read bytes, and reset end position | |
msg.body.resize(i); | |
// Recalculate the message size | |
msg.message_header.size = msg.size(); | |
// Return the target message so it can be "chained" | |
return msg; | |
} | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template<typename MsgId> | |
struct MessageHeader | |
{ | |
MsgId id{}; | |
uint32_t size = 0; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
// Windows stuff. | |
#define _CRT_SECURE_NO_WARNINGS | |
#define NOMINMAX | |
//boost asio | |
#include <boost/asio.hpp> | |
#include <ctime> | |
#include <cstring> | |
#include <chrono> | |
#include <time.h> | |
#include <sstream> | |
#include <string> | |
#include <vector> | |
#include <stdio.h> | |
#include <string.h> | |
#include <assert.h> | |
#include <stdarg.h> | |
#include <iostream> | |
#include <thread> | |
#include <future> | |
#include <algorithm> | |
#include <cstdlib> | |
#include <list> | |
#include <set> | |
#include <mutex> | |
#include <condition_variable> | |
#include <functional> | |
#include <execution> | |
#ifdef _WIN32 | |
#include <ShlObj.h> | |
#include <Shlwapi.h> | |
#endif | |
#include <iostream> | |
#include <ostream> | |
#include <sstream> | |
#include <errno.h> | |
#include <stdint.h> | |
#include <fstream> | |
#include <stdexcept> | |
#include <math.h> | |
#include <random> | |
#include <climits> | |
#include <memory> | |
#include <utility> | |
#include <queue> | |
#include <optional> | |
#include <boost/bind/bind.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/scoped_ptr.hpp> | |
#include <boost/enable_shared_from_this.hpp> | |
#include <boost/make_shared.hpp> | |
#include "boost/date_time/posix_time/posix_time_types.hpp" | |
#include <boost/thread/mutex.hpp> | |
#include <boost/functional.hpp> | |
#include <boost/function.hpp> | |
#include <boost/thread.hpp> | |
template<typename MsgId, typename Executor> | |
class Connection; | |
template<typename MsgId> | |
struct Message; | |
template<typename MsgId, typename Executor> | |
class Server; | |
template <typename MsgId, typename Executor> | |
using conn_t = Connection<MsgId, Executor>; | |
template <typename MsgId, typename Executor> | |
using ConnPtr = std::shared_ptr<conn_t<MsgId, Executor>>; | |
template <typename MsgId, typename Executor> | |
using WeakConnPtr = std::weak_ptr<conn_t<MsgId, Executor>>; | |
template <typename MsgId> | |
using MsgPtr = std::shared_ptr<Message<MsgId> const>; | |
template <typename MsgId, typename Executor> | |
using base_type = Server<MsgId, Executor>; | |
template <typename Executor> | |
using acceptor_t = boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, Executor>; | |
template <typename Executor> | |
using socket_t = boost::asio::basic_stream_socket<boost::asio::ip::tcp, Executor>; | |
template <typename Executor> | |
using strand_t = boost::asio::strand<Executor>; | |
template<typename MsgId, typename Executor> | |
using ClientDisconnectCallbackType = std::function<void(ConnPtr<MsgId, Executor> const&)>; | |
template<typename MsgId, typename Executor> | |
using ClientMessageCallbackType = std::function<void(MsgPtr <MsgId> const&, ConnPtr<MsgId, Executor> const&)>; | |
template<typename MsgId, typename Executor> | |
using ClientMessageSentCallbackType = std::function<void(MsgPtr <MsgId> const&, ConnPtr<MsgId, Executor> const&)>; | |
using namespace std::chrono_literals; | |
#include "MessageHeader.h" | |
#include "Message.h" | |
#include "ThreadSafeQueue.h" | |
#include "Connection.h" | |
#include "Server.h" | |
#include "Client.h" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Server-Test.cpp : This file contains the 'main' function. Program execution begins and ends there. | |
// | |
#include "prerequisites.h" | |
enum class MessageTypes : uint32_t | |
{ | |
ServerAccept, | |
ServerDeny, | |
ServerPing, | |
MessageAll, | |
SendText, | |
ServerMessage, | |
ServerMessage1, | |
ServerMessage2, | |
ServerMessage3, | |
ServerMessage4, | |
ServerMessage5, | |
ServerMessage6, | |
ServerMessage7, | |
ServerMessage8, | |
ServerMessage9, | |
}; | |
class MyServer : public Server<MessageTypes,boost::asio::any_io_executor> | |
{ | |
public: | |
MyServer(boost::asio::any_io_executor ex, boost::asio::ip::tcp::endpoint ep) : base_type<MessageTypes, boost::asio::any_io_executor>(ex,ep) | |
{ | |
} | |
void OnClientDisconnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
std::cout << "[ Client " << client->GetId() << " ] Disconnected" << std::endl; | |
} | |
bool OnClientConnect(ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
std::cout << "[ Client " << client->GetId() << " ] Connected" << std::endl; | |
auto msg = std::make_shared<Message<MessageTypes>>(); | |
msg->message_header.id = MessageTypes::ServerAccept; | |
(*msg) << client->GetId(); | |
client->Send(std::move(msg)); | |
return true; | |
} | |
void OnMessage(MsgPtr<MessageTypes> const& msg, ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
std::cout << "[ Client " << client->GetId() << " ] "; | |
if (msg->message_header.id == MessageTypes::SendText) | |
{ | |
std::string message = msg->GetString(0); | |
std::cout << " Received Message" << std::endl; | |
message = ""; | |
client->Send(msg);//fire it back to the client | |
} | |
} | |
void OnMessageSent(MsgPtr<MessageTypes> const& msg, [[maybe_unused]] ConnPtr<MessageTypes, boost::asio::any_io_executor> const& client) override | |
{ | |
//std::cout << "[ Client " << msg->remote->GetId() << " ] "; | |
//std::cout << " Sent Message" << std::endl; | |
} | |
}; | |
MyServer* srv; | |
std::atomic_bool stop = false; | |
std::chrono::high_resolution_clock::duration previous_time; | |
boost::asio::thread_pool ctx; | |
int messageCount; | |
boost::asio::high_resolution_timer timer(ctx); | |
void timedBcast(boost::system::error_code e) | |
{ | |
std::chrono::high_resolution_clock::time_point tStart = std::chrono::high_resolution_clock::now(); | |
std::chrono::high_resolution_clock::time_point tPrepared = tStart; | |
if (!e && !stop) | |
{ | |
if (srv != nullptr) | |
{ | |
if (messageCount >= 1000) | |
{ | |
messageCount = 0; | |
} | |
auto msg = std::make_shared<Message<MessageTypes>>(); | |
msg->message_header.id = MessageTypes::MessageAll; | |
msg->Append(std::string(rand() % 102400 + 81920, 'a') + " " + std::to_string(messageCount++)); | |
msg->TransactionId = "Broadcast"; | |
tPrepared = std::chrono::high_resolution_clock::now(); | |
srv->BroadcastMessage(std::move(msg)); | |
auto const tDone = std::chrono::high_resolution_clock::now(); | |
auto const time = tDone - tPrepared; | |
auto const time2 = tDone - tStart; | |
if (time != previous_time && time > 2us) { | |
// timer += time - 6; | |
std::cout << "Broadcast took " << time / 1.0us << "us | " | |
<< time2 / 1us << "us" << std::endl; | |
previous_time = time; | |
} | |
auto time_expire = 30ms - time2; | |
if (time_expire <= 5ms) { | |
time_expire = 30ms; | |
} | |
// Reschedule the timer | |
timer.expires_from_now(time_expire); | |
timer.async_wait(timedBcast); | |
//std::cout << "BROADCAST" << std::endl; | |
} | |
} | |
} | |
int main() | |
{ | |
messageCount = 1; | |
srand(time(NULL)); | |
boost::asio::ip::tcp::endpoint ep{ {}, 40000 }; | |
//boost::asio::thread_pool ctx(32); | |
srv = new MyServer(ctx.get_executor(), ep); | |
std::cout << "Hello World!\n"; | |
timer.expires_from_now(std::chrono::milliseconds(5000)); | |
timer.async_wait(timedBcast); | |
std::string str; | |
std::getline(std::cin, str); | |
srv->interrupt(); | |
stop = true; | |
timer.cancel(); | |
ctx.stop(); | |
ctx.join(); | |
str=""; | |
std::getline(std::cin, str); | |
delete srv; | |
return 0; | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template <typename MsgId, typename Executor> | |
class Server | |
{ | |
public: | |
Server(Executor executor, boost::asio::ip::tcp::endpoint endpoint) : executor_(executor) | |
{ | |
acceptor_.open(endpoint.protocol()); | |
acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); | |
acceptor_.set_option(boost::asio::ip::tcp::acceptor::do_not_route(true)); | |
acceptor_.set_option(boost::asio::ip::tcp::acceptor::keep_alive(false)); | |
acceptor_.set_option(boost::asio::ip::tcp::acceptor::enable_connection_aborted(false)); | |
acceptor_.set_option(boost::asio::ip::tcp::acceptor::linger(false, 3)); | |
acceptor_.bind(endpoint); | |
acceptor_.listen(); | |
start_accept(); | |
} | |
void interrupt() { | |
shutdownBegan = true; | |
boost::asio::post(strand_, [this] { | |
acceptor_.cancel(); | |
acceptor_.close(); | |
for (const auto& [key, value] : connections) { | |
if (auto conn = value.lock()) | |
{ | |
conn->Disconnect(true, true, true); | |
} | |
} | |
connections.clear(); | |
shutdownCompleted = true; | |
}); | |
} | |
void BroadcastMessage(MsgPtr<MsgId> msg) | |
{ | |
boost::asio::post(strand_, [this, msg = std::move(msg)]{ | |
for (const auto& kvp : connections) | |
{ | |
boost::asio::post(executor_, [this, handle = kvp.second, msg]{ | |
if (auto conn = handle.lock()) | |
{ | |
if (!conn->IsInvalid()) | |
{ | |
conn->Send(std::move(msg)); | |
} | |
} | |
}); | |
} | |
}); | |
} | |
virtual ~Server() = default; | |
bool IsShutDownCompleted() { | |
return shutdownCompleted; | |
} | |
protected: | |
// This server class should override thse functions to implement | |
// customised functionality | |
// Called when a client connects, you can veto the connection by returning false | |
virtual bool OnClientConnect(ConnPtr<MsgId,Executor> const& client) | |
{ | |
return false; | |
} | |
// Called when a client appears to have disconnected | |
virtual void OnClientDisconnect(ConnPtr<MsgId, Executor> const& client) | |
{ | |
} | |
// Called when a message arrives | |
virtual void OnMessage(MsgPtr<MsgId> const& message,ConnPtr<MsgId, Executor> const& client) | |
{ | |
} | |
virtual void OnMessageSent(MsgPtr<MsgId> const& message,ConnPtr<MsgId, Executor> const& client) | |
{ | |
} | |
private: | |
void client_disconnected(ConnPtr<MsgId, Executor> const& client) | |
{ | |
OnClientDisconnect(client); | |
removeConnection(client); | |
} | |
void client_message(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client) | |
{ | |
OnMessage(message,client); | |
} | |
void message_sent(MsgPtr<MsgId> const& message, ConnPtr<MsgId, Executor> const& client) | |
{ | |
OnMessageSent(message,client); | |
} | |
void start_accept() { | |
if (!shutdownBegan && this->acceptor_.is_open()) | |
{ | |
auto new_connection = conn_t<MsgId, Executor>::create( | |
boost::asio::make_strand(executor_), connectionIds++, | |
boost::bind(&Server<MsgId, Executor>::client_message,this, boost::placeholders::_1, boost::placeholders::_2), | |
boost::bind(&Server<MsgId, Executor>::message_sent, this, boost::placeholders::_1, boost::placeholders::_2), | |
boost::bind(&Server<MsgId, Executor>::client_disconnected, this, boost::placeholders::_1) | |
); | |
acceptor_.async_accept(new_connection->socket(), boost::bind(&Server<MsgId, Executor>::handle_accept, this, new_connection, boost::asio::placeholders::error)); | |
} | |
} | |
#pragma endregion | |
void handle_accept(ConnPtr<MsgId, Executor> new_connection, boost::system::error_code ec) | |
{ | |
if (!ec && !shutdownBegan) | |
{ | |
start_accept(); | |
if (OnClientConnect(new_connection)) | |
{ | |
new_connection->accepted(); | |
addConnection(std::move(new_connection)); | |
} | |
} | |
else if (!shutdownBegan) | |
{ | |
start_accept(); | |
} | |
} | |
#pragma region Add Connection | |
void addConnection(ConnPtr<MsgId, Executor> connection) | |
{ | |
if (!shutdownBegan) | |
{ | |
boost::asio::post(strand_, [this, conn = std::move(connection)]()mutable{ | |
connections.emplace(conn->GetId(), std::move(conn)); | |
for (auto it = connections.begin(); it != connections.end();) { | |
if (it->second.expired()) { | |
connections.erase(it); | |
return; | |
} | |
else | |
++it; | |
} | |
}); | |
} | |
} | |
#pragma endregion | |
#pragma region Remove Connection | |
void removeConnectionById(int id) | |
{ | |
if (!shutdownBegan) | |
{ | |
boost::asio::post(strand_, [this, id]()mutable {connections.erase(id); }); | |
} | |
} | |
void removeConnection(ConnPtr<MsgId, Executor> const& connection) | |
{ | |
removeConnectionById(connection->GetId()); | |
} | |
#pragma endregion | |
Executor executor_; | |
strand_t<Executor> strand_=boost::asio::make_strand(executor_); | |
acceptor_t<Executor> acceptor_{ strand_ }; | |
std::map<int, WeakConnPtr<MsgId, Executor>> connections; | |
std::atomic_bool shutdownBegan; | |
std::atomic_bool shutdownCompleted; | |
int connectionIds{ 10000 }; | |
}; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "prerequisites.h" | |
template<typename T> | |
class ThreadSafeQueue | |
{ | |
public: | |
ThreadSafeQueue() = default; | |
ThreadSafeQueue(const ThreadSafeQueue<T>&) = delete; | |
ThreadSafeQueue& operator=(const ThreadSafeQueue<T>&) = delete; | |
virtual ~ThreadSafeQueue() { clear(); } | |
// Returns and maintains item at front of Queue | |
const T& front() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.front(); | |
} | |
// Returns and maintains item at back of Queue | |
const T& back() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.back(); | |
} | |
// Removes and returns item from front of Queue | |
T pop_front() | |
{ | |
std::scoped_lock lock(muxQueue); | |
auto t = std::move(deqQueue.front()); | |
deqQueue.pop_front(); | |
return t; | |
} | |
// Removes and returns item from back of Queue | |
T pop_back() | |
{ | |
std::scoped_lock lock(muxQueue); | |
auto t = std::move(deqQueue.back()); | |
deqQueue.pop_back(); | |
return t; | |
} | |
// Adds an item to back of Queue | |
void push_back(const T& item) | |
{ std::scoped_lock lock(muxQueue); | |
deqQueue.emplace_back(std::move(item)); | |
} | |
// Adds an item to front of Queue | |
void push_front(const T& item) | |
{ | |
std::scoped_lock lock(muxQueue); | |
deqQueue.emplace_front(std::move(item)); | |
} | |
// Returns true if Queue has no items | |
bool empty() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.empty(); | |
} | |
// Returns number of items in Queue | |
size_t count() | |
{ | |
std::scoped_lock lock(muxQueue); | |
return deqQueue.size(); | |
} | |
// Clears Queue | |
void clear() | |
{ | |
std::scoped_lock lock(muxQueue); | |
deqQueue.clear(); | |
} | |
protected: | |
std::mutex muxQueue; | |
std::deque<T> deqQueue; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment