Skip to content

Instantly share code, notes, and snippets.

Last active January 24, 2024 12:42
Show Gist options
  • Save vc8bp/8689ce94c71926d38ef70b4987ca26f3 to your computer and use it in GitHub Desktop.
Save vc8bp/8689ce94c71926d38ef70b4987ca26f3 to your computer and use it in GitHub Desktop.
#include <iostream>
#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <thread>
//#include "queue.h"
#include "tokenMap.hpp"
#include <unordered_map>
#include <unordered_set>
#include <chrono>
#include "queue.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <mutex>
#include <condition_variable>
using namespace boost::asio;
using tcp = boost::asio::ip::tcp;
extern volatile bool shouldStop;
extern ThreadSafeQueue<std::string> messageQueue;
extern ThreadSafeMap<std::string> messageMap;
//std::vector<boost::beast::websocket::stream<tcp::socket>> clients;
std::vector<std::unique_ptr<boost::beast::websocket::stream<tcp::socket>>> clients;
std::mutex clietnsMutex;
std::condition_variable clientsCV;
std::unordered_set<int> uniqueTokens;
std::mutex uniqueTokensMutex;
std::condition_variable uniqueTokensCV;
boost::property_tree::ptree parseJson(const std::string& jsonStr);
void readMessagesThread() {
while (true) {
if (clients.empty()) continue;
int i = 0;
try {
for (i = 0; i < clients.size(); ++i) {
boost::beast::flat_buffer buffer;
auto received_message = boost::beast::buffers_to_string(buffer.cdata());
std::cout << received_message << std::endl;
catch (...) {
std::cerr << "Caught unknown exception" << i << std::endl;
if (!clients.empty()) {
std::lock_guard<std::mutex> guard(clietnsMutex);
clients.erase(clients.begin() + i);
//void readMessagesThread() {
// while (true) {
// if (clients.empty()) continue;
// int i = 0;
// try {
// for (i = 0; i < clients.size(); ++i) {
// boost::beast::flat_buffer buffer;
// auto& client = clients[i];
// // Attempt to read a message asynchronously
// client->async_read(
// buffer,
// [i, &buffer, &client](const boost::system::error_code& ec, std::size_t /*bytes_transferred*/) {
// if (!ec) {
// // Successfully received message, print it to the console
// auto received_message = boost::beast::buffers_to_string(buffer.cdata());
// std::cout << received_message << std::endl;
// // Continue reading messages for this client
// readMessagesThread();
// }
// else {
// // Handle error or client disconnect
// std::cerr << "Error reading from client: " << ec.message() << std::endl;
// std::lock_guard<std::mutex> guard(clietnsMutex);
// clients.erase(clients.begin() + i);
// return;
// }
// }
// );
// }
// }
// catch (...) {
// std::cerr << "Caught unknown exception" << i << std::endl;
// if (!clients.empty()) {
// std::lock_guard<std::mutex> guard(clietnsMutex);
// clients.erase(clients.begin() + i);
// --i;
// }
// }
// }
void broadCastFeed() {
while (true) {
std::string message = messageQueue.dequeue();
if (clients.empty()) continue;
int i = 0;
try {
for (i = 0; i < clients.size(); i++) {
//for (auto& client : clients) {
//boost::property_tree::ptree pt = parseJson(message);
//int token_id = pt.get<int>("token");
// std::unique_lock<std::mutex> lock(uniqueTokensMutex);
// if (uniqueTokens.find(token_id) != uniqueTokens.end()) {
// client.write(boost::asio::buffer(message));
// }
catch (...) {
std::cerr << "Caught unknown exception" << i << std::endl;
if (i < clients.size()) {
std::lock_guard<std::mutex> guard(clietnsMutex);
clients.erase(clients.begin() + i);
int server(const char* ip_address, const char* PORT) {
auto const address = boost::asio::ip::make_address(ip_address);
auto const port = static_cast<unsigned short>(std::atoi(PORT));
boost::asio::io_context ioc{ 1 };
tcp::acceptor accepter{ ioc, {address, port} };
std::cout << "WS Server is Listening on ws://" << ip_address << ":" << PORT << std::endl;
std::thread readThread(readMessagesThread);
std::thread broadCastThread(broadCastFeed);
while (true) {
tcp::socket socket{ ioc };
std::cout << "Client Connected!! \n";
std::thread{ [q = std::move(socket)]() mutable {
boost::beast::websocket::stream<tcp::socket> ws {std::move(q)};
std::string received_message = messageMap.printAll();
std::lock_guard<std::mutex> guard(clietnsMutex);
} }.detach();
return 0;
Copy link

vc8bp commented Dec 6, 2023


Copy link

vc8bp commented Jan 24, 2024


Copy link

vc8bp commented Jan 24, 2024


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment