Skip to content

Instantly share code, notes, and snippets.

@DavidAntliff
Created August 27, 2021 02:30
Show Gist options
  • Save DavidAntliff/f6e600dc146516f64d60fd68660875bf to your computer and use it in GitHub Desktop.
Save DavidAntliff/f6e600dc146516f64d60fd68660875bf to your computer and use it in GitHub Desktop.
Move the creation of the worker thread to the connack handler
#include <iostream>
#include <thread>
#include <chrono>
#include <boost/asio.hpp>
#include <mqtt_client_cpp.hpp>
struct App {
boost::asio::io_context ioc {};
using client_t = decltype(MQTT_NS::make_sync_client(std::declval<boost::asio::io_context &>(), "", 0));
client_t c;
};
void task(App::client_t & c) {
// accessing c is generally not thread-safe, but calling c->socket() and using the result to post() is OK
while (true) {
//std::this_thread::sleep_for(std::chrono::milliseconds(1));
//std::cout << "publish" << std::endl;
c->socket()->post(
[&c] {
c->publish("mqtt_cpp_demo/topic1", "0123456789ABCDEF", mqtt::qos::at_most_once);
}
);
}
}
int main() {
std::thread t;
App appInstance;
std::shared_ptr<App> app {&appInstance};
app->c = mqtt::make_sync_client(app->ioc, "localhost", 1883);
app->c->set_client_id("mqtt_cpp_demo");
app->c->set_clean_session(true);
app->c->set_connack_handler(
[&c = app->c, &t](bool sp, mqtt::connect_return_code connack_return_code){
std::cout << "Connack handler called" << std::endl;
std::cout << "Session Present: " << std::boolalpha << sp << std::endl;
std::cout << "Connack Return Code: " << connack_return_code << std::endl;
// start the publish thread
t = std::thread(&task, std::ref(c));
return true;
}
);
app->c->set_close_handler( [](){} );
app->c->set_error_handler(
[](boost::system::error_code const & ec) {}
);
app->c->set_puback_handler(
[](std::uint16_t packet_id) { return true; }
);
app->c->set_pubrec_handler(
[](std::uint16_t packet_id) { return true; }
);
app->c->set_pubcomp_handler(
[](std::uint16_t packet_id) { return true; }
);
app->c->set_suback_handler(
[&c = app->c](std::uint16_t packet_id, std::vector<mqtt::suback_return_code> results){
std::cout << "suback received. packet_id: " << packet_id << std::endl;
for (auto const& e : results) {
std::cout << "subscribe result: " << e << std::endl;
}
return true;
}
);
app->c->set_publish_handler(
[&c = app->c]
(mqtt::optional<std::uint16_t> packet_id,
mqtt::publish_options pubopts,
mqtt::buffer topic_name,
mqtt::buffer contents) {
std::cout << "publish received."
<< " dup: " << pubopts.get_dup()
<< " qos: " << pubopts.get_qos()
<< " retain: " << pubopts.get_retain() << std::endl;
if (packet_id)
std::cout << "packet_id: " << *packet_id << std::endl;
std::cout << "topic_name: " << topic_name << std::endl;
std::cout << "contents: " << contents << std::endl;
//c->disconnect();
return true;
}
);
app->c->connect();
app->ioc.run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment