Skip to content

Instantly share code, notes, and snippets.

@DavidAntliff
Created August 30, 2021 23:53
Show Gist options
  • Save DavidAntliff/06828b7e5e4ad85a0cb2616ba9151127 to your computer and use it in GitHub Desktop.
Save DavidAntliff/06828b7e5e4ad85a0cb2616ba9151127 to your computer and use it in GitHub Desktop.
Support disconnection via subscription or timeout
/*
* Working integration of mqtt_cpp with producer thread.
* Having the worker thread started by the connack handler is recommended by the mqtt_cpp author.
* This program also demonstrates automatic disconnection by a timer, as well as handling disconnection by the broker.
* Additionally, a message can be sent to the 'quit' topic to initiate disconnection.
*
* E.g.
* $ mosquitto_pub -h localhost -t quit -m 1
*/
#include <iostream>
#include <thread>
#include <chrono>
#include <boost/asio.hpp>
#include <mqtt_client_cpp.hpp>
using namespace std::chrono;
// Maintain consistent message publish rate in messages per second
constexpr int PUBLISH_RATE {25'000};
// Auto disconnect after this period. Set to zero to disable.
constexpr auto WATCHDOG_TIMER {seconds{0}};
struct App {
boost::asio::io_context ioc {};
using client_t = decltype(MQTT_NS::make_sync_client(std::declval<boost::asio::io_context &>(), "", 0));
client_t c;
};
std::atomic<bool> running = false;
void task(App::client_t & c) {
// accessing c is generally not thread-safe, but calling c->socket() and using the result to post() is OK
auto loop_delay = round<system_clock::duration>(duration<double>{1.0 / PUBLISH_RATE});
auto begin_time = system_clock::now();
auto end_time = begin_time + loop_delay;
running = true;
while (running) {
c->socket()->post(
[&c] {
c->publish("mqtt_cpp_demo/topic1", "0123456789ABCDEF", mqtt::qos::at_most_once);
}
);
std::this_thread::sleep_until(end_time);
begin_time = end_time;
end_time = begin_time + loop_delay;
}
std::cout << "task complete" << std::endl;
}
int main() {
std::thread t;
auto app = std::make_shared<App>();
app->c = mqtt::make_sync_client(app->ioc, "localhost", 1883);
app->c->set_client_id("mqtt_cpp_demo");
app->c->set_clean_session(true);
app->c->set_connack_handler(
[&c = app->c, &t](bool sp, mqtt::connect_return_code connack_return_code){
std::cout << "Connack handler called" << std::endl;
std::cout << "Session Present: " << std::boolalpha << sp << std::endl;
std::cout << "Connack Return Code: " << connack_return_code << std::endl;
if (connack_return_code == mqtt::connect_return_code::accepted) {
c->subscribe("quit", mqtt::qos::at_most_once);
// start the publish thread
t = std::thread(&task, std::ref(c));
}
return true;
}
);
app->c->set_close_handler( [](){
std::cout << "close connection" << std::endl;
running = false;
});
app->c->set_error_handler(
[](boost::system::error_code const & ec) {
std::cerr << "error " << ec << std::endl;
}
);
app->c->set_puback_handler(
[](std::uint16_t packet_id) { return true; }
);
app->c->set_pubrec_handler(
[](std::uint16_t packet_id) { return true; }
);
app->c->set_pubcomp_handler(
[](std::uint16_t packet_id) { return true; }
);
app->c->set_suback_handler(
[&c = app->c](std::uint16_t packet_id, std::vector<mqtt::suback_return_code> results){
std::cout << "suback received. packet_id: " << packet_id << std::endl;
for (auto const& e : results) {
std::cout << "subscribe result: " << e << std::endl;
}
return true;
}
);
app->c->set_publish_handler(
[&c = app->c]
(mqtt::optional<std::uint16_t> packet_id,
mqtt::publish_options pubopts,
mqtt::buffer topic_name,
mqtt::buffer contents) {
std::cout << "publish received."
<< " dup: " << pubopts.get_dup()
<< " qos: " << pubopts.get_qos()
<< " retain: " << pubopts.get_retain() << std::endl;
if (packet_id)
std::cout << "packet_id: " << *packet_id << std::endl;
std::cout << "topic_name: " << topic_name << std::endl;
std::cout << "contents: " << contents << std::endl;
if (topic_name == "quit") {
std::cout << "Disconnect" << std::endl;
c->disconnect();
}
return true;
}
);
std::thread watchdog;
if (WATCHDOG_TIMER > system_clock::duration::zero()) {
watchdog = std::thread([&]{
std::cout << "watchdog started (" << duration_cast<seconds>(WATCHDOG_TIMER).count() << " seconds)" << std::endl;
std::this_thread::sleep_for(WATCHDOG_TIMER);
std::cout << "watchdog expired" << std::endl;
app->ioc.stop();
std::cout << "ioc stopped" << std::endl;
});
}
app->c->connect();
app->ioc.run();
std::cout << "ioc finished" << std::endl;
// ask the thread to stop
running = false;
if (watchdog.joinable()) {
watchdog.join();
std::cout << "Watchdog thread joined" << std::endl;
}
if (t.joinable()) {
t.join();
std::cout << "Task thread joined" << std::endl;
}
std::cout << "Done" << std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment