Skip to content

Instantly share code, notes, and snippets.

@mtrencseni
Last active July 13, 2023 11:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mtrencseni/29bfaf9b261167365773c6044a2fa321 to your computer and use it in GitHub Desktop.
Save mtrencseni/29bfaf9b261167365773c6044a2fa321 to your computer and use it in GitHub Desktop.
#include <map>
#include <regex>
#include <iostream>
#include <sstream>
#include <vector>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <algorithm>
#include <cctype>
#include <asio.hpp>
#include <asio/io_context.hpp>
#include <signal.h>
using asio::awaitable;
using asio::co_spawn;
using asio::detached;
using asio::ip::tcp;
using asio::use_awaitable;
using asio::io_context;
using namespace std;
void trim(string& str)
{
auto is_space = [](char c) { return std::isspace(static_cast<unsigned char>(c)); };
str.erase(str.begin(), std::find_if(str.begin(), str.end(), std::not_fn(is_space)));
str.erase(std::find_if(str.rbegin(), str.rend(), std::not_fn(is_space)).base(), str.end());
}
bool startswith(string_view s, string_view prefix)
{
return s.find(prefix, 0) == 0;
}
vector<string> split(const string& s, char delim) {
vector<string> result;
stringstream ss(s);
string item;
while (getline(ss, item, delim))
result.push_back(item);
return result;
}
typedef map<string, string> Dict;
Dict parse_dict(const string& dict_str) {
Dict result;
regex dict_pattern(R"(\s*'([^']+)'\s*:\s*'([^']+)'\s*)");
smatch matches;
auto search_start = dict_str.begin();
while (regex_search(search_start, dict_str.end(), matches, dict_pattern)) {
result[matches[1].str()] = matches[2].str();
search_start = matches[0].second;
}
return result;
}
awaitable<void> session(tcp::socket socket)
{
try
{
string line;
static unordered_map<string, unordered_set< tcp::socket* >> subscribers;
unordered_set<string> subscriptions;
Dict mesg;
cout << "Client connected: " << socket.remote_endpoint() << '\n';
while (line != "quit")
{
line.clear();
co_await asio::async_read_until(socket, asio::dynamic_buffer(line), '\n', use_awaitable); // line-by-line reading
//trim(line);
mesg = parse_dict(line);
if (line != "")
{
//if (startswith(line, "subscribe "))
if (mesg["command"] == "subscribe")
{
//string topic = split(line, ' ')[1];
subscribers[mesg["topic"]].insert(&socket);
subscriptions.insert(mesg["topic"]);
}
else if (mesg["command"] == "send")
{
/*string topic = split(line, ' ')[1];
auto pos = line.find("\"", 0);
auto from = pos + 1;
auto count = line.length() - (pos + 2);
string message = line.substr(from, count);
message.append("\r\n");*/
for (auto& subscriber : subscribers[mesg["topic"]])
co_await asio::async_write(*subscriber, asio::buffer(line), use_awaitable);
}
}
}
for (const string& topic : subscriptions)
subscribers[topic].erase(&socket);
cout << "Client disconnected: " << socket.remote_endpoint() << '\n';
}
catch (const std::exception& e)
{
cout << "Exception in session: " << e.what() << '\n';
}
}
awaitable<void> listener(io_context& ctx, unsigned short port)
{
tcp::acceptor acceptor(ctx, { tcp::v4(), port });
cout << "Server listening on port " << port << "..." << endl;
while (true)
{
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(ctx, session(move(socket)), detached);
}
}
int main(int argc, char* argv[])
{
if (argc < 2)
{
cerr << "Usage: " << argv[0] << " <port>" << endl;
return 1;
}
io_context ctx;
string arg = argv[1];
size_t pos;
unsigned short port = stoi(arg, &pos);
asio::signal_set signals(ctx, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto) { ctx.stop(); });
auto listen = listener(ctx, port);
co_spawn(ctx, move(listen), asio::detached);
ctx.run();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment