Skip to content

Instantly share code, notes, and snippets.

@mtrencseni
Created April 8, 2023 14:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mtrencseni/bc0f5dad14cd3c5c65fb78e66a228e9b to your computer and use it in GitHub Desktop.
Save mtrencseni/bc0f5dad14cd3c5c65fb78e66a228e9b to your computer and use it in GitHub Desktop.
#ifndef __MESSAGEQUEUE_HPP__
#define __MESSAGEQUEUE_HPP__
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <deque>
#include <asio.hpp>
#include "Utils.hpp"
using asio::awaitable;
using asio::ip::tcp;
using asio::use_awaitable;
using namespace std;
class Client
{
tcp::socket& socket;
public:
Client(tcp::socket& s) : socket(s) {}
void Write(const Dict& msg)
{
auto s = serialize_dict(msg) + "\r\n";
asio::write(socket, asio::buffer(s));
}
};
class MessageQueue
{
const unsigned short cache_length = 100;
unordered_map<string, unordered_set<Client*>> subscribers;
unordered_map<Client*, unordered_set<string>> subscriptions;
unordered_map<string, deque<Dict>> cache;
unordered_map<string, long> max_index;
public:
void OnConnect(Client& client)
{
}
void OnDisconnect(Client& client)
{
for (const string& topic : subscriptions[&client])
subscribers[topic].erase(&client);
}
void OnMessage(Client& client, string& line)
{
Dict msg = parse_dict(line);
if (msg["command"] == "subscribe")
OnSubscribe(client, msg);
else
OnSend(client, msg);
}
void OnSubscribe(Client& client, Dict& msg)
{
long last_seen;
subscribers[msg["topic"]].insert(&client);
subscriptions[&client].insert(msg["topic"]);
if (msg["last_seen"] == "")
last_seen = -1;
else
last_seen = stoi(msg["last_seen"]);
CachePlayback(client, msg, last_seen);
}
void OnSend(Client& client, Dict& msg)
{
AddIndex(msg);
if (msg["delivery"] == "")
msg["delivery"] = "all";
for (auto& client : subscribers[msg["topic"]])
{
client->Write(msg);
if (msg["delivery"] == "one")
break;
}
if ((subscribers[msg["topic"]].size() == 0) || (msg["delivery"] == "all"))
CachePush(msg);
}
private:
void AddIndex(Dict& msg)
{
msg["index"] = to_string(max_index[msg["topic"]]);
max_index[msg["topic"]] += 1;
}
void CachePush(Dict& msg)
{
cache[msg["topic"]].push_back(msg);
if (cache[msg["topic"]].size() > cache_length)
cache[msg["topic"]].pop_front();
}
void CachePlayback(Client& client, Dict& msg, long last_seen)
{
bool reCache = false;
for (auto& cached_msg : cache[msg["topic"]])
{
if (stoi(cached_msg["index"]) > last_seen)
{
client.Write(cached_msg);
if (cached_msg["delivery"] == "one")
reCache = true;
}
}
if (reCache)
{
deque<Dict> newCache;
for (auto& cached_msg : cache[msg["topic"]])
if ((stoi(cached_msg["index"]) <= last_seen) || (cached_msg["delivery"] == "all"))
newCache.push_back(cached_msg);
cache[msg["topic"]] = newCache;
}
}
};
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment