Created
April 15, 2023 17:46
-
-
Save mtrencseni/956bded2f9475cd222e85a5cceee690f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <asio.hpp> | |
#include <iostream> | |
#include <unordered_map> | |
#include <vector> | |
#include <deque> | |
#include <string> | |
#include "json.hpp" | |
using asio::ip::tcp; | |
using json = nlohmann::json; | |
struct Subscriber { | |
tcp::socket socket; | |
asio::streambuf buffer; | |
std::string last_seen; | |
explicit Subscriber(tcp::socket&& socket) | |
: socket(std::move(socket)) {} | |
}; | |
class MessageQueueServer { | |
public: | |
MessageQueueServer(asio::io_context& io_context, int port) | |
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) { | |
do_accept(); | |
} | |
private: | |
void do_accept() { | |
acceptor_.async_accept( | |
[this](std::error_code ec, tcp::socket socket) { | |
if (!ec) { | |
auto subscriber = std::make_shared<Subscriber>(std::move(socket)); | |
read_command(subscriber); | |
} | |
do_accept(); | |
}); | |
} | |
void read_command(std::shared_ptr<Subscriber> subscriber) { | |
asio::async_read_until(subscriber->socket, subscriber->buffer, '\n', | |
[this, subscriber](std::error_code ec, std::size_t) { | |
if (!ec) { | |
std::istream is(&subscriber->buffer); | |
json command; | |
is >> command; | |
subscriber->buffer.consume(subscriber->buffer.size()); | |
handle_command(subscriber, command); | |
read_command(subscriber); | |
} | |
}); | |
} | |
void handle_command(std::shared_ptr<Subscriber> subscriber, const json& command) { | |
std::string cmd = command["command"]; | |
std::string topic = command["topic"]; | |
if (cmd == "subscribe") { | |
subscribers_[topic].push_back(subscriber); | |
if (command.contains("last_seen")) { | |
subscriber->last_seen = command["last_seen"]; | |
} | |
replay_messages(subscriber, topic); | |
} | |
else if (cmd == "send") { | |
std::string msg = command["msg"]; | |
std::string delivery = command["delivery"]; | |
cache_message(topic, command); | |
if (delivery == "all") { | |
for (const auto& sub : subscribers_[topic]) { | |
send_message(sub, command); | |
} | |
} | |
else if (delivery == "one") { | |
if (!subscribers_[topic].empty()) { | |
send_message(subscribers_[topic].front(), command); | |
} | |
} | |
} | |
} | |
void cache_message(const std::string& topic, const json& command) { | |
auto& messages = message_cache_[topic]; | |
json cached_msg = command; | |
cached_msg["id"] = next_message_id_++; | |
messages.push_back(cached_msg); | |
if (messages.size() > max_messages_per_topic_) { | |
messages.pop_front(); | |
} | |
} | |
void replay_messages(std::shared_ptr<Subscriber> subscriber, | |
const std::string& topic) { | |
for (const auto& msg : message_cache_[topic]) { | |
if (subscriber->last_seen.empty() || msg["id"] > subscriber->last_seen) { | |
send_message(subscriber, msg); | |
} | |
} | |
} | |
void send_message(std::shared_ptr<Subscriber> subscriber, const json& message) { | |
std::string msg = message.dump() + "\n"; | |
asio::async_write(subscriber->socket, asio::buffer(msg), | |
[](std::error_code, std::size_t) {}); | |
} | |
private: | |
tcp::acceptor acceptor_; | |
std::unordered_map<std::string, std::vector<std::shared_ptr<Subscriber>>> subscribers_; | |
std::unordered_map<std::string, std::deque<json>> message_cache_; | |
std::size_t next_message_id_ = 1; | |
const std::size_t max_messages_per_topic_ = 100; | |
asio::streambuf buffer_; | |
}; | |
int main(int argc, char* argv[]) { | |
if (argc != 2) { | |
std::cerr << "Usage: message_queue_server <port>\n"; | |
return 1; | |
} | |
int port = std::stoi(argv[1]); | |
try { | |
asio::io_context io_context; | |
MessageQueueServer server(io_context, port); | |
io_context.run(); | |
} | |
catch (std::exception& e) { | |
std::cerr << "Exception: " << e.what() << "\n"; | |
} | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment