Skip to content

Instantly share code, notes, and snippets.

@erkoln
Created March 11, 2024 14:34
Show Gist options
  • Save erkoln/90a4efbf7c3f94d8c76a9a355939e2a2 to your computer and use it in GitHub Desktop.
Save erkoln/90a4efbf7c3f94d8c76a9a355939e2a2 to your computer and use it in GitHub Desktop.
non-working libmosquitto-based command-line MQTT subscriber
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#include <mosquitto.h>
#include <errno.h>
#include <stdbool.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <sys/select.h>
#include <pthread.h>
#include <signal.h>
#define MQTT_POLL_INTERVAL_MS 100
#define MQTT_CONNECT_KEEPALIVE 10
static volatile bool should_stop = false;
char broker_host[128] = "127.0.0.1";
uint16_t broker_port = 1883;
char topic[128] = "#";
void handle_result (int result, struct mosquitto *mosq, struct epoll_event *pev, int *psock, int epfd)
{
int err;
switch (result) {
case MOSQ_ERR_SUCCESS:
break;
case MOSQ_ERR_KEEPALIVE:
fprintf (stderr, "MOSQ_ERR_KEEPALIVE\n");
break;
case MOSQ_ERR_TLS:
fprintf (stderr, "MOSQ_ERR_TLS\n");
exit (EXIT_FAILURE);
break;
case MOSQ_ERR_INVAL:
fprintf (stderr, "mosquitto_loop: Invalid parameter!\n");
exit (EXIT_FAILURE);
case MOSQ_ERR_NOMEM:
fprintf (stderr, "mosquitto_loop: Out of memory!\n");
exit (EXIT_FAILURE);
case MOSQ_ERR_NO_CONN:
fprintf (stderr, "mosquitto_loop: No connection. Reconnecting...\n");
// Stop watching old socket FD...
pev->data.fd = *psock;
pev->events = EPOLLIN | EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_DEL, pev->data.fd, pev);
// (Re)connect to broker...
result = mosquitto_connect(mosq, broker_host, broker_port, MQTT_CONNECT_KEEPALIVE);
if (result != MOSQ_ERR_SUCCESS) {
fprintf (stderr, "Could not connect to broker: %d (%d/%s)\n", result, errno, strerror(errno));
exit (EXIT_FAILURE);
}
// Start watching new socket FD...
pev->data.fd = *psock = mosquitto_socket(mosq);
pev->events = EPOLLIN | EPOLLOUT | EPOLLET;
err = epoll_ctl(epfd, EPOLL_CTL_ADD, pev->data.fd, pev);
if (err != 0) {
fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, *psock );
exit (EXIT_FAILURE);
}
break;
case MOSQ_ERR_CONN_LOST:
fprintf (stderr, "mosquitto_loop: Connection lost. Reconnecting...\n");
// Stop watching old socket FD...
pev->data.fd = *psock;
pev->events = EPOLLIN | EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_DEL, pev->data.fd, pev);
// Reconnect...
mosquitto_reconnect(mosq);
// Start watching new socket FD...
pev->data.fd = *psock = mosquitto_socket(mosq);
pev->events = EPOLLIN | EPOLLOUT | EPOLLET;
err = epoll_ctl(epfd, EPOLL_CTL_ADD, pev->data.fd, pev);
if (err != 0) {
fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, *psock );
exit (EXIT_FAILURE);
}
break;
case MOSQ_ERR_PROTOCOL:
fprintf (stderr, "mosquitto_loop: Protocol error!\n");
break;
case MOSQ_ERR_ERRNO:
fprintf (stderr, "mosquitto_loop: syscall error; %s (%d)\n", strerror(errno), errno);
break;
default:
fprintf (stderr, "mosquitto_loop: unknown error %d!\n", result);
break;
}
}
void* consumer_loop (struct mosquitto *mosq)
{
struct epoll_event ev;
int epfd = epoll_create1(0);
if (epfd < 0) {
fprintf (stderr, "epfd < 0: %d\n", epfd);
exit (EXIT_FAILURE);
}
int sock = mosquitto_socket(mosq);
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
int err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
if (err != 0) {
fprintf (stderr, "err != 0: %d\n", err);
exit (EXIT_FAILURE);
}
while ( !should_stop ) {
if ( mosquitto_want_write(mosq) ) {
// Re-arm EPOLLOUT event...
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
err = epoll_ctl(epfd, EPOLL_CTL_MOD, ev.data.fd, &ev);
}
err = epoll_wait(epfd, &ev, 1, MQTT_POLL_INTERVAL_MS);
if (err < 0) {
fprintf (stderr, "epoll_wait returned %d, errno: %d %s\n", err, errno, strerror(errno));
}
else if (err > 0) {
if (ev.events & EPOLLIN) {
int result = mosquitto_loop_read(mosq, 1);
handle_result (result, mosq, &ev, &sock, epfd);
}
if ( (ev.events & EPOLLOUT)
&& mosquitto_want_write(mosq) )
{
int result = mosquitto_loop_write(mosq, 1);
handle_result (result, mosq, &ev, &sock, epfd);
}
}
int result = mosquitto_loop_misc(mosq);
handle_result (result, mosq, &ev, &sock, epfd);
}
return NULL;
}
void on_connect (struct mosquitto *mosq, void *unused, int rc)
{
fprintf (stderr, "connected (%d).\n", rc);
int result = mosquitto_subscribe (mosq, NULL, topic, 1);
if (result != MOSQ_ERR_SUCCESS) {
fprintf (stderr, "Subscription failed for topic \"%s\" on central broker: %d\n", topic, result);
}
}
void on_message (struct mosquitto *mosq, void *unused, const struct mosquitto_message *msg)
{
fprintf (stdout, "%.*s\n", msg->payloadlen, (const char*)msg->payload);
}
void on_signal (int sig)
{
should_stop = true;
}
int main (int argc, char **argv)
{
char client_id[256];
char keyfile[256] = "";
char certfile[256] = "";
char cafile[256] = "";
snprintf (client_id, sizeof(client_id), "sub-%u-%lu", (unsigned)getpid(), (unsigned long)time(NULL));
// parse command-line parameters...
int opt;
while ( (opt = getopt(argc, argv, "h:p:t:k:c:f:")) != -1 ) {
switch (opt) {
case 'h':
snprintf (broker_host, sizeof(broker_host), "%s", optarg);
break;
case 'p':
broker_port = (uint16_t) atoi (optarg);
break;
case 't':
snprintf (topic, sizeof(topic), "%s", optarg);
break;
case 'k':
snprintf (keyfile, sizeof(keyfile), "%s", optarg);
break;
case 'c':
snprintf (certfile, sizeof(certfile), "%s", optarg);
break;
case 'f':
snprintf (cafile, sizeof(cafile), "%s", optarg);
break;
default:
fprintf (stderr, "Usage: %s [-h broker_host] [-p broker_port] [-t topic] [-k keyfile] [-c certfile] [-f cafile]\n", argv[0]);
return EXIT_FAILURE;
}
}
// gobble up all low fds so libmosquitto can't get any...
for (int i=0 ; i<FD_SETSIZE ; ++i)
dup (1);
struct mosquitto *mosq = mosquitto_new(client_id, true, NULL);
if (mosq == NULL) {
fprintf (stderr, "failed to allocate Mosquitto client: %d/%s\n", errno, strerror(errno));
return EXIT_FAILURE;
}
if ( mosquitto_threaded_set (mosq, true) != MOSQ_ERR_SUCCESS ) {
fprintf (stderr, "mosquitto_threaded_set() failed for main MQTT client!\n" );
}
if ( keyfile[0] || certfile[0] || cafile[0] ) {
fprintf (stderr, "initializing TLS...\nkey file: %s\ncert file: %s\nCA file: %s\n", keyfile, certfile, cafile);
int result = mosquitto_tls_set (
mosq,
cafile,
NULL,
certfile[0] ? certfile : NULL,
keyfile[0] ? keyfile : NULL,
NULL );
if ( result != MOSQ_ERR_SUCCESS ) {
fprintf (stderr, "failed to initialize TLS parameters for mosquitto client: %d (%d/%s)", result, errno, strerror(errno) );
return EXIT_FAILURE;
}
}
mosquitto_connect_callback_set (mosq, on_connect);
mosquitto_message_callback_set (mosq, on_message);
fprintf (stderr, "connecting MQTT subscriber to %s:%"PRIu16"...\n", broker_host, broker_port);
int result = mosquitto_connect(mosq, broker_host, broker_port, MQTT_CONNECT_KEEPALIVE);
if (result != MOSQ_ERR_SUCCESS) {
fprintf (stderr, "connection failed: %d (%d/%s)\n", result, errno, strerror(errno));
return EXIT_FAILURE;
}
signal (SIGINT, on_signal);
signal (SIGTERM, on_signal);
pthread_t thread;
pthread_create (&thread, NULL, (void*(*)(void*))&consumer_loop, mosq);
while (!should_stop) {
usleep (100000);
}
pthread_join (thread, NULL);
//consumer_loop(mosq);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment