Last active
July 13, 2023 11:43
-
-
Save mtrencseni/29bfaf9b261167365773c6044a2fa321 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <map> | |
#include <regex> | |
#include <iostream> | |
#include <sstream> | |
#include <vector> | |
#include <string> | |
#include <unordered_map> | |
#include <unordered_set> | |
#include <algorithm> | |
#include <cctype> | |
#include <asio.hpp> | |
#include <asio/io_context.hpp> | |
#include <signal.h> | |
using asio::awaitable; | |
using asio::co_spawn; | |
using asio::detached; | |
using asio::ip::tcp; | |
using asio::use_awaitable; | |
using asio::io_context; | |
using namespace std; | |
void trim(string& str) | |
{ | |
auto is_space = [](char c) { return std::isspace(static_cast<unsigned char>(c)); }; | |
str.erase(str.begin(), std::find_if(str.begin(), str.end(), std::not_fn(is_space))); | |
str.erase(std::find_if(str.rbegin(), str.rend(), std::not_fn(is_space)).base(), str.end()); | |
} | |
bool startswith(string_view s, string_view prefix) | |
{ | |
return s.find(prefix, 0) == 0; | |
} | |
vector<string> split(const string& s, char delim) { | |
vector<string> result; | |
stringstream ss(s); | |
string item; | |
while (getline(ss, item, delim)) | |
result.push_back(item); | |
return result; | |
} | |
typedef map<string, string> Dict; | |
Dict parse_dict(const string& dict_str) { | |
Dict result; | |
regex dict_pattern(R"(\s*'([^']+)'\s*:\s*'([^']+)'\s*)"); | |
smatch matches; | |
auto search_start = dict_str.begin(); | |
while (regex_search(search_start, dict_str.end(), matches, dict_pattern)) { | |
result[matches[1].str()] = matches[2].str(); | |
search_start = matches[0].second; | |
} | |
return result; | |
} | |
awaitable<void> session(tcp::socket socket) | |
{ | |
try | |
{ | |
string line; | |
static unordered_map<string, unordered_set< tcp::socket* >> subscribers; | |
unordered_set<string> subscriptions; | |
Dict mesg; | |
cout << "Client connected: " << socket.remote_endpoint() << '\n'; | |
while (line != "quit") | |
{ | |
line.clear(); | |
co_await asio::async_read_until(socket, asio::dynamic_buffer(line), '\n', use_awaitable); // line-by-line reading | |
//trim(line); | |
mesg = parse_dict(line); | |
if (line != "") | |
{ | |
//if (startswith(line, "subscribe ")) | |
if (mesg["command"] == "subscribe") | |
{ | |
//string topic = split(line, ' ')[1]; | |
subscribers[mesg["topic"]].insert(&socket); | |
subscriptions.insert(mesg["topic"]); | |
} | |
else if (mesg["command"] == "send") | |
{ | |
/*string topic = split(line, ' ')[1]; | |
auto pos = line.find("\"", 0); | |
auto from = pos + 1; | |
auto count = line.length() - (pos + 2); | |
string message = line.substr(from, count); | |
message.append("\r\n");*/ | |
for (auto& subscriber : subscribers[mesg["topic"]]) | |
co_await asio::async_write(*subscriber, asio::buffer(line), use_awaitable); | |
} | |
} | |
} | |
for (const string& topic : subscriptions) | |
subscribers[topic].erase(&socket); | |
cout << "Client disconnected: " << socket.remote_endpoint() << '\n'; | |
} | |
catch (const std::exception& e) | |
{ | |
cout << "Exception in session: " << e.what() << '\n'; | |
} | |
} | |
awaitable<void> listener(io_context& ctx, unsigned short port) | |
{ | |
tcp::acceptor acceptor(ctx, { tcp::v4(), port }); | |
cout << "Server listening on port " << port << "..." << endl; | |
while (true) | |
{ | |
tcp::socket socket = co_await acceptor.async_accept(use_awaitable); | |
co_spawn(ctx, session(move(socket)), detached); | |
} | |
} | |
int main(int argc, char* argv[]) | |
{ | |
if (argc < 2) | |
{ | |
cerr << "Usage: " << argv[0] << " <port>" << endl; | |
return 1; | |
} | |
io_context ctx; | |
string arg = argv[1]; | |
size_t pos; | |
unsigned short port = stoi(arg, &pos); | |
asio::signal_set signals(ctx, SIGINT, SIGTERM); | |
signals.async_wait([&](auto, auto) { ctx.stop(); }); | |
auto listen = listener(ctx, port); | |
co_spawn(ctx, move(listen), asio::detached); | |
ctx.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment