Skip to content

Instantly share code, notes, and snippets.

@tomlankhorst
Created May 11, 2021 14:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomlankhorst/64ee0443565ebb20bd89631ddc240cb7 to your computer and use it in GitHub Desktop.
Save tomlankhorst/64ee0443565ebb20bd89631ddc240cb7 to your computer and use it in GitHub Desktop.
MQTT mosquitto demo (with fmt/7.1.3 mosquitto/2.0.10)
#include <iostream>
#include <csignal>
#include <thread>
#include <chrono>
#include <string_view>
#include <fmt/core.h>
#include <fmt/color.h>
#include <mosquitto.h>
#include <openssl/ssl.h>
bool exit_flag = false;
bool debug_log = false;
using namespace std::chrono_literals;
int main(int argc, char** argv) {
struct sigaction act {};
act.sa_handler = [](int s){
fmt::print("Interrupted\n");
exit_flag = true;
};
sigaction(SIGINT, &act, nullptr);
if (argc != 3 && argc != 4 && argc != 6) {
fmt::print("Usage: mqtt hostname port (cafile (username password))\n");
return 0;
}
fmt::print("mqtt demo\n");
mosquitto* mosq = nullptr;
int rc = MOSQ_ERR_SUCCESS;
// Init lib
mosquitto_lib_init();
// Say hello
int major, minor, rev;
mosquitto_lib_version(&major, &minor, &rev);
fmt::print(fmt::fg(fmt::color::blue_violet), "libmosquitto {}.{}.{}\n{}\n", major, minor, rev, OPENSSL_VERSION_TEXT);
// Create new instance
mosq = mosquitto_new(nullptr, true, nullptr);
if (!mosq) {
fmt::print("Out of memory\n");
return -1;
}
// Callbacks
mosquitto_connect_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int reason_code){
fmt::print("on_connect: {}\n", mosquitto_connack_string(reason_code));
if (reason_code != 0)
mosquitto_disconnect(mosq);
});
mosquitto_publish_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int mid){
});
mosquitto_subscribe_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos){
bool have_subscription = false;
for(auto i = 0; i < qos_count; i++){
fmt::print("on_subscribe: {}, granted qos = {}\n", i, granted_qos[i]);
if(granted_qos[i] <= 2){
have_subscription = true;
}
}
if (!have_subscription){
/* The broker rejected all of our subscriptions, we know we only sent
* the one SUBSCRIBE, so there is no point remaining connected. */
fmt::print(fmt::fg(fmt::color::red), "Error: All subscriptions rejected.\n");
mosquitto_disconnect(mosq);
}
});
if (debug_log) {
mosquitto_log_callback_set(mosq, [](struct mosquitto *mosq, void *obj, int level, const char *str){
fmt::print(fmt::fg(fmt::color::light_gray), "level {}: {}\n", level, str);
});
}
mosquitto_message_callback_set(mosq, [](struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg) {
fmt::print(fmt::fg(fmt::color::olive), "Received on {}: {} ({})\n", msg->topic, std::string_view{(char*)msg->payload, (size_t)msg->payloadlen}, msg->qos);
});
if (argc >= 6) {
rc = mosquitto_username_pw_set(mosq, argv[4], argv[5]);
if (rc != MOSQ_ERR_SUCCESS) {
mosquitto_destroy(mosq);
fmt::print("Error setting username and password: {}\n", mosquitto_strerror(rc));
return 1;
}
}
if (argc >= 4) {
rc = mosquitto_tls_set(mosq, argv[3], nullptr, nullptr, nullptr, nullptr);
if (rc != MOSQ_ERR_SUCCESS) {
mosquitto_destroy(mosq);
fmt::print("Error setting TLS: {}\n", mosquitto_strerror(rc));
return 1;
}
// verify server certificate
mosquitto_tls_opts_set(mosq, SSL_VERIFY_PEER, NULL, NULL);
}
// Connect
int port = std::stoi(argv[2]);
std::string host = argv[1];
fmt::print(fmt::fg(fmt::color::gray), "Connecting to {}:{}\n", host, port);
rc = mosquitto_connect(mosq, host.c_str(), port, 60);
if(rc != MOSQ_ERR_SUCCESS){
mosquitto_destroy(mosq);
fmt::print("Error: {}\n", mosquitto_strerror(rc));
return 1;
}
rc = mosquitto_loop_start(mosq);
if(rc != MOSQ_ERR_SUCCESS){
mosquitto_destroy(mosq);
fmt::print("Error: {}\n", mosquitto_strerror(rc));
return 1;
}
mosquitto_subscribe(mosq, nullptr, "#", 1);
auto start = std::chrono::system_clock::now();
while (!exit_flag) {
std::string payload = fmt::format("Hello from MQTT client {}", (std::chrono::system_clock::now() - start).count());
rc = mosquitto_publish(mosq, nullptr, "hello/world", payload.size(), payload.c_str(), 1, false);
if(rc != MOSQ_ERR_SUCCESS){
fmt::print("Error publishing: {}\n", mosquitto_strerror(rc));
}
std::this_thread::sleep_for(10ms);
}
// Clean-up lib
mosquitto_lib_cleanup();
return 0;
}
@tomlankhorst
Copy link
Author

tomlankhorst commented May 11, 2021

$ ./mqtt-demo mqtt.my.org 8883 \
    /usr/share/ca-certificates/mozilla/DST_Root_CA_X3.crt \
    {{username}} {{password}}

gives

mqtt demo
libmosquitto 2.0.10
OpenSSL 1.1.1k  25 Mar 2021
Connecting to mqtt.my.org:8883
on_connect: Connection Accepted.
on_subscribe: 0, granted qos = 1
Received on digger_trigger/machine1: hello (0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment