Skip to content

Instantly share code, notes, and snippets.

@svyatogor
Last active January 31, 2022 01:17
Show Gist options
  • Save svyatogor/6356dc1b4bea0033c086f7be2bd77843 to your computer and use it in GitHub Desktop.
Save svyatogor/6356dc1b4bea0033c086f7be2bd77843 to your computer and use it in GitHub Desktop.
AMQP-CPP Boost Asio handler
#pragma once
#include <amqpcpp/linux_tcp.h>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/posix/stream_descriptor.hpp>
#include <boost/foreach.hpp>
using namespace std;
class LibBoostAsioHandler : public virtual AMQP::TcpHandler {
private:
boost::asio::io_context &_iocontext;
protected:
struct SocketEvent {
shared_ptr<boost::asio::posix::stream_descriptor> socket;
int flags;
};
class Watcher : public virtual enable_shared_from_this<Watcher> {
private:
uint16_t _heartbeat_interval{0};
shared_ptr<boost::asio::deadline_timer> _timer;
boost::asio::io_context &_iocontext;
boost::asio::io_context::strand _strand;
map<int, SocketEvent> _sockets;
AMQP::TcpConnection *_connection;
void poll_read(int fd) {
auto it = _sockets.find(fd);
if (it == _sockets.end() || !(it->second.flags & AMQP::readable)) {
return;
}
auto handler = [&, self = shared_from_this(),
fd](const boost::system::error_code &error) {
if (!error) {
_connection->process(fd, AMQP::readable);
poll_read(fd);
}
};
it->second.socket->async_wait(
boost::asio::posix::stream_descriptor::wait_read,
_strand.wrap(handler));
}
void poll_write(int fd) {
auto it = _sockets.find(fd);
if (it == _sockets.end() || !(it->second.flags & AMQP::writable)) {
return;
}
auto handler = [&, self = shared_from_this(),
fd](const boost::system::error_code &error) {
if (!error) {
_connection->process(fd, AMQP::writable);
poll_write(fd);
}
};
it->second.socket->async_wait(
boost::asio::posix::stream_descriptor::wait_write,
_strand.wrap(handler));
}
void start_timer() {
if (_heartbeat_interval == 0)
return;
auto tick = [&, self = shared_from_this()](
const boost::system::error_code &error) {
if (!error) {
spdlog::debug("AMQP Heartbeat...");
_connection->heartbeat();
start_timer();
}
};
_timer->expires_at(_timer->expires_at() +
boost::posix_time::seconds(_heartbeat_interval));
_timer->async_wait(_strand.wrap(tick));
}
public:
Watcher(boost::asio::io_context &io_context,
AMQP::TcpConnection *connection)
: _strand(io_context), _iocontext(io_context) {
_connection = connection;
}
Watcher(Watcher &&that) = delete;
Watcher(const Watcher &that) = delete;
~Watcher() {}
void stop() {
BOOST_FOREACH (auto &e, _sockets) {
e.second.socket->cancel();
e.second.socket->release();
}
_sockets.clear();
_heartbeat_interval = 0;
if (_timer.get() != nullptr) {
_timer->cancel();
}
}
void events(int fd, int monitor_events) {
auto it = _sockets.find(fd);
if (it == _sockets.end() && !monitor_events) {
return;
}
if (it != _sockets.end() && !monitor_events) {
it->second.socket->cancel();
it->second.socket->release();
_sockets.erase(it);
return;
}
shared_ptr<boost::asio::posix::stream_descriptor> socket;
int last_flags = 0;
if (it == _sockets.end()) {
socket = make_shared<boost::asio::posix::stream_descriptor>(_iocontext);
socket->assign(fd);
socket->non_blocking(true);
SocketEvent e = {.socket = socket, .flags = monitor_events};
_sockets[fd] = e;
} else {
socket = it->second.socket;
last_flags = it->second.flags;
it->second.flags = monitor_events;
}
if (monitor_events & AMQP::readable && !(last_flags & AMQP::readable)) {
poll_read(fd);
}
if (monitor_events & AMQP::writable && !(last_flags & AMQP::writable)) {
poll_write(fd);
}
}
void heartbeat(int seconds) {
_heartbeat_interval = seconds;
_timer = make_shared<boost::asio::deadline_timer>(
_iocontext, boost::posix_time::seconds(0));
start_timer();
}
};
map<AMQP::TcpConnection *, shared_ptr<Watcher>> _watchers;
void monitor(AMQP::TcpConnection *const connection, const int fd,
const int flags) override {
auto iter = _watchers.find(connection);
if (iter == _watchers.end()) {
const shared_ptr<Watcher> watcher =
make_shared<Watcher>(_iocontext, connection);
_watchers[connection] = watcher;
watcher->events(fd, flags);
} else {
iter->second->events(fd, flags);
}
}
virtual uint16_t onNegotiate(AMQP::TcpConnection *connection,
uint16_t interval) override {
spdlog::info("AMQP Heartbeat interval set to {}", interval);
auto it = _watchers.find(connection);
if (it == _watchers.end() || !interval) {
return 0;
}
it->second->heartbeat(interval);
return interval;
}
virtual void onDetached(AMQP::TcpConnection *connection) override {
auto iter = _watchers.find(connection);
if (iter != _watchers.end()) {
iter->second->stop();
_watchers.erase(iter);
}
}
public:
LibBoostAsioHandler() = delete;
explicit LibBoostAsioHandler(boost::asio::io_context &io_context)
: _iocontext(io_context) {}
~LibBoostAsioHandler() override = default;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment