Skip to content

Instantly share code, notes, and snippets.

@vc8bp
Last active January 24, 2024 12:42
Show Gist options
  • Save vc8bp/8689ce94c71926d38ef70b4987ca26f3 to your computer and use it in GitHub Desktop.
Save vc8bp/8689ce94c71926d38ef70b4987ca26f3 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <thread>
//#include "queue.h"
#include "tokenMap.hpp"
#include <unordered_map>
#include <unordered_set>
#include <chrono>
#include "queue.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <mutex>
#include <condition_variable>
using namespace boost::asio;
using tcp = boost::asio::ip::tcp;
extern volatile bool shouldStop;
extern ThreadSafeQueue<std::string> messageQueue;
extern ThreadSafeMap<std::string> messageMap;
//std::vector<boost::beast::websocket::stream<tcp::socket>> clients;
std::vector<std::unique_ptr<boost::beast::websocket::stream<tcp::socket>>> clients;
std::mutex clietnsMutex;
std::condition_variable clientsCV;
std::unordered_set<int> uniqueTokens;
std::mutex uniqueTokensMutex;
std::condition_variable uniqueTokensCV;
boost::property_tree::ptree parseJson(const std::string& jsonStr);
void readMessagesThread() {
while (true) {
if (clients.empty()) continue;
int i = 0;
try {
for (i = 0; i < clients.size(); ++i) {
boost::beast::flat_buffer buffer;
clients[i]->read(buffer);
auto received_message = boost::beast::buffers_to_string(buffer.cdata());
std::cout << received_message << std::endl;
}
}
catch (...) {
std::cerr << "Caught unknown exception" << i << std::endl;
if (!clients.empty()) {
std::lock_guard<std::mutex> guard(clietnsMutex);
clients.erase(clients.begin() + i);
--i;
}
}
}
}
//void readMessagesThread() {
// while (true) {
// if (clients.empty()) continue;
//
// int i = 0;
// try {
// for (i = 0; i < clients.size(); ++i) {
// boost::beast::flat_buffer buffer;
// auto& client = clients[i];
//
// // Attempt to read a message asynchronously
// client->async_read(
// buffer,
// [i, &buffer, &client](const boost::system::error_code& ec, std::size_t /*bytes_transferred*/) {
// if (!ec) {
// // Successfully received message, print it to the console
// auto received_message = boost::beast::buffers_to_string(buffer.cdata());
// std::cout << received_message << std::endl;
//
// // Continue reading messages for this client
// readMessagesThread();
// }
// else {
// // Handle error or client disconnect
// std::cerr << "Error reading from client: " << ec.message() << std::endl;
//
// std::lock_guard<std::mutex> guard(clietnsMutex);
// clients.erase(clients.begin() + i);
//
// return;
// }
// }
// );
// }
// }
// catch (...) {
// std::cerr << "Caught unknown exception" << i << std::endl;
// if (!clients.empty()) {
// std::lock_guard<std::mutex> guard(clietnsMutex);
// clients.erase(clients.begin() + i);
// --i;
// }
//
// }
// }
//}
void broadCastFeed() {
while (true) {
std::string message = messageQueue.dequeue();
if (clients.empty()) continue;
int i = 0;
try {
for (i = 0; i < clients.size(); i++) {
clients[i]->write(boost::asio::buffer(message));
}
//for (auto& client : clients) {
//client.write(boost::asio::buffer(message));
//boost::property_tree::ptree pt = parseJson(message);
//int token_id = pt.get<int>("token");
//{
// std::unique_lock<std::mutex> lock(uniqueTokensMutex);
// if (uniqueTokens.find(token_id) != uniqueTokens.end()) {
// client.write(boost::asio::buffer(message));
// }
//}
//}
}
catch (...) {
std::cerr << "Caught unknown exception" << i << std::endl;
if (i < clients.size()) {
std::lock_guard<std::mutex> guard(clietnsMutex);
clients.erase(clients.begin() + i);
--i;
}
}
}
}
int server(const char* ip_address, const char* PORT) {
auto const address = boost::asio::ip::make_address(ip_address);
auto const port = static_cast<unsigned short>(std::atoi(PORT));
boost::asio::io_context ioc{ 1 };
tcp::acceptor accepter{ ioc, {address, port} };
std::cout << "WS Server is Listening on ws://" << ip_address << ":" << PORT << std::endl;
std::thread readThread(readMessagesThread);
std::thread broadCastThread(broadCastFeed);
while (true) {
tcp::socket socket{ ioc };
accepter.accept(socket);
std::cout << "Client Connected!! \n";
std::thread{ [q = std::move(socket)]() mutable {
boost::beast::websocket::stream<tcp::socket> ws {std::move(q)};
ws.accept();
std::string received_message = messageMap.printAll();
ws.write(boost::asio::buffer(received_message));
//clients.push_back(std::move(ws));
std::lock_guard<std::mutex> guard(clietnsMutex);
clients.push_back(std::make_unique<boost::beast::websocket::stream<tcp::socket>>(std::move(ws)));
} }.detach();
}
return 0;
}
@vc8bp
Copy link
Author

vc8bp commented Dec 6, 2023

test

@vc8bp
Copy link
Author

vc8bp commented Jan 24, 2024

test

@vc8bp
Copy link
Author

vc8bp commented Jan 24, 2024

e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment