Skip to content

Instantly share code, notes, and snippets.

@mtrencseni
Created April 15, 2023 17:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mtrencseni/956bded2f9475cd222e85a5cceee690f to your computer and use it in GitHub Desktop.
Save mtrencseni/956bded2f9475cd222e85a5cceee690f to your computer and use it in GitHub Desktop.
#include <asio.hpp>
#include <iostream>
#include <unordered_map>
#include <vector>
#include <deque>
#include <string>
#include "json.hpp"
using asio::ip::tcp;
using json = nlohmann::json;
struct Subscriber {
tcp::socket socket;
asio::streambuf buffer;
std::string last_seen;
explicit Subscriber(tcp::socket&& socket)
: socket(std::move(socket)) {}
};
class MessageQueueServer {
public:
MessageQueueServer(asio::io_context& io_context, int port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port)) {
do_accept();
}
private:
void do_accept() {
acceptor_.async_accept(
[this](std::error_code ec, tcp::socket socket) {
if (!ec) {
auto subscriber = std::make_shared<Subscriber>(std::move(socket));
read_command(subscriber);
}
do_accept();
});
}
void read_command(std::shared_ptr<Subscriber> subscriber) {
asio::async_read_until(subscriber->socket, subscriber->buffer, '\n',
[this, subscriber](std::error_code ec, std::size_t) {
if (!ec) {
std::istream is(&subscriber->buffer);
json command;
is >> command;
subscriber->buffer.consume(subscriber->buffer.size());
handle_command(subscriber, command);
read_command(subscriber);
}
});
}
void handle_command(std::shared_ptr<Subscriber> subscriber, const json& command) {
std::string cmd = command["command"];
std::string topic = command["topic"];
if (cmd == "subscribe") {
subscribers_[topic].push_back(subscriber);
if (command.contains("last_seen")) {
subscriber->last_seen = command["last_seen"];
}
replay_messages(subscriber, topic);
}
else if (cmd == "send") {
std::string msg = command["msg"];
std::string delivery = command["delivery"];
cache_message(topic, command);
if (delivery == "all") {
for (const auto& sub : subscribers_[topic]) {
send_message(sub, command);
}
}
else if (delivery == "one") {
if (!subscribers_[topic].empty()) {
send_message(subscribers_[topic].front(), command);
}
}
}
}
void cache_message(const std::string& topic, const json& command) {
auto& messages = message_cache_[topic];
json cached_msg = command;
cached_msg["id"] = next_message_id_++;
messages.push_back(cached_msg);
if (messages.size() > max_messages_per_topic_) {
messages.pop_front();
}
}
void replay_messages(std::shared_ptr<Subscriber> subscriber,
const std::string& topic) {
for (const auto& msg : message_cache_[topic]) {
if (subscriber->last_seen.empty() || msg["id"] > subscriber->last_seen) {
send_message(subscriber, msg);
}
}
}
void send_message(std::shared_ptr<Subscriber> subscriber, const json& message) {
std::string msg = message.dump() + "\n";
asio::async_write(subscriber->socket, asio::buffer(msg),
[](std::error_code, std::size_t) {});
}
private:
tcp::acceptor acceptor_;
std::unordered_map<std::string, std::vector<std::shared_ptr<Subscriber>>> subscribers_;
std::unordered_map<std::string, std::deque<json>> message_cache_;
std::size_t next_message_id_ = 1;
const std::size_t max_messages_per_topic_ = 100;
asio::streambuf buffer_;
};
int main(int argc, char* argv[]) {
if (argc != 2) {
std::cerr << "Usage: message_queue_server <port>\n";
return 1;
}
int port = std::stoi(argv[1]);
try {
asio::io_context io_context;
MessageQueueServer server(io_context, port);
io_context.run();
}
catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment