-
-
Save erkoln/90a4efbf7c3f94d8c76a9a355939e2a2 to your computer and use it in GitHub Desktop.
non-working libmosquitto-based command-line MQTT subscriber
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <inttypes.h> | |
#include <mosquitto.h> | |
#include <errno.h> | |
#include <stdbool.h> | |
#include <pthread.h> | |
#include <unistd.h> | |
#include <sys/epoll.h> | |
#include <sys/select.h> | |
#include <pthread.h> | |
#include <signal.h> | |
#define MQTT_POLL_INTERVAL_MS 100 | |
#define MQTT_CONNECT_KEEPALIVE 10 | |
static volatile bool should_stop = false; | |
char broker_host[128] = "127.0.0.1"; | |
uint16_t broker_port = 1883; | |
char topic[128] = "#"; | |
void handle_result (int result, struct mosquitto *mosq, struct epoll_event *pev, int *psock, int epfd) | |
{ | |
int err; | |
switch (result) { | |
case MOSQ_ERR_SUCCESS: | |
break; | |
case MOSQ_ERR_KEEPALIVE: | |
fprintf (stderr, "MOSQ_ERR_KEEPALIVE\n"); | |
break; | |
case MOSQ_ERR_TLS: | |
fprintf (stderr, "MOSQ_ERR_TLS\n"); | |
exit (EXIT_FAILURE); | |
break; | |
case MOSQ_ERR_INVAL: | |
fprintf (stderr, "mosquitto_loop: Invalid parameter!\n"); | |
exit (EXIT_FAILURE); | |
case MOSQ_ERR_NOMEM: | |
fprintf (stderr, "mosquitto_loop: Out of memory!\n"); | |
exit (EXIT_FAILURE); | |
case MOSQ_ERR_NO_CONN: | |
fprintf (stderr, "mosquitto_loop: No connection. Reconnecting...\n"); | |
// Stop watching old socket FD... | |
pev->data.fd = *psock; | |
pev->events = EPOLLIN | EPOLLOUT | EPOLLET; | |
epoll_ctl(epfd, EPOLL_CTL_DEL, pev->data.fd, pev); | |
// (Re)connect to broker... | |
result = mosquitto_connect(mosq, broker_host, broker_port, MQTT_CONNECT_KEEPALIVE); | |
if (result != MOSQ_ERR_SUCCESS) { | |
fprintf (stderr, "Could not connect to broker: %d (%d/%s)\n", result, errno, strerror(errno)); | |
exit (EXIT_FAILURE); | |
} | |
// Start watching new socket FD... | |
pev->data.fd = *psock = mosquitto_socket(mosq); | |
pev->events = EPOLLIN | EPOLLOUT | EPOLLET; | |
err = epoll_ctl(epfd, EPOLL_CTL_ADD, pev->data.fd, pev); | |
if (err != 0) { | |
fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, *psock ); | |
exit (EXIT_FAILURE); | |
} | |
break; | |
case MOSQ_ERR_CONN_LOST: | |
fprintf (stderr, "mosquitto_loop: Connection lost. Reconnecting...\n"); | |
// Stop watching old socket FD... | |
pev->data.fd = *psock; | |
pev->events = EPOLLIN | EPOLLOUT | EPOLLET; | |
epoll_ctl(epfd, EPOLL_CTL_DEL, pev->data.fd, pev); | |
// Reconnect... | |
mosquitto_reconnect(mosq); | |
// Start watching new socket FD... | |
pev->data.fd = *psock = mosquitto_socket(mosq); | |
pev->events = EPOLLIN | EPOLLOUT | EPOLLET; | |
err = epoll_ctl(epfd, EPOLL_CTL_ADD, pev->data.fd, pev); | |
if (err != 0) { | |
fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, *psock ); | |
exit (EXIT_FAILURE); | |
} | |
break; | |
case MOSQ_ERR_PROTOCOL: | |
fprintf (stderr, "mosquitto_loop: Protocol error!\n"); | |
break; | |
case MOSQ_ERR_ERRNO: | |
fprintf (stderr, "mosquitto_loop: syscall error; %s (%d)\n", strerror(errno), errno); | |
break; | |
default: | |
fprintf (stderr, "mosquitto_loop: unknown error %d!\n", result); | |
break; | |
} | |
} | |
void* consumer_loop (struct mosquitto *mosq) | |
{ | |
struct epoll_event ev; | |
int epfd = epoll_create1(0); | |
if (epfd < 0) { | |
fprintf (stderr, "epfd < 0: %d\n", epfd); | |
exit (EXIT_FAILURE); | |
} | |
int sock = mosquitto_socket(mosq); | |
ev.data.fd = sock; | |
ev.events = EPOLLIN | EPOLLOUT | EPOLLET; | |
int err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); | |
if (err != 0) { | |
fprintf (stderr, "err != 0: %d\n", err); | |
exit (EXIT_FAILURE); | |
} | |
while ( !should_stop ) { | |
if ( mosquitto_want_write(mosq) ) { | |
// Re-arm EPOLLOUT event... | |
ev.data.fd = sock; | |
ev.events = EPOLLIN | EPOLLOUT | EPOLLET; | |
err = epoll_ctl(epfd, EPOLL_CTL_MOD, ev.data.fd, &ev); | |
} | |
err = epoll_wait(epfd, &ev, 1, MQTT_POLL_INTERVAL_MS); | |
if (err < 0) { | |
fprintf (stderr, "epoll_wait returned %d, errno: %d %s\n", err, errno, strerror(errno)); | |
} | |
else if (err > 0) { | |
if (ev.events & EPOLLIN) { | |
int result = mosquitto_loop_read(mosq, 1); | |
handle_result (result, mosq, &ev, &sock, epfd); | |
} | |
if ( (ev.events & EPOLLOUT) | |
&& mosquitto_want_write(mosq) ) | |
{ | |
int result = mosquitto_loop_write(mosq, 1); | |
handle_result (result, mosq, &ev, &sock, epfd); | |
} | |
} | |
int result = mosquitto_loop_misc(mosq); | |
handle_result (result, mosq, &ev, &sock, epfd); | |
} | |
return NULL; | |
} | |
void on_connect (struct mosquitto *mosq, void *unused, int rc) | |
{ | |
fprintf (stderr, "connected (%d).\n", rc); | |
int result = mosquitto_subscribe (mosq, NULL, topic, 1); | |
if (result != MOSQ_ERR_SUCCESS) { | |
fprintf (stderr, "Subscription failed for topic \"%s\" on central broker: %d\n", topic, result); | |
} | |
} | |
void on_message (struct mosquitto *mosq, void *unused, const struct mosquitto_message *msg) | |
{ | |
fprintf (stdout, "%.*s\n", msg->payloadlen, (const char*)msg->payload); | |
} | |
void on_signal (int sig) | |
{ | |
should_stop = true; | |
} | |
int main (int argc, char **argv) | |
{ | |
char client_id[256]; | |
char keyfile[256] = ""; | |
char certfile[256] = ""; | |
char cafile[256] = ""; | |
snprintf (client_id, sizeof(client_id), "sub-%u-%lu", (unsigned)getpid(), (unsigned long)time(NULL)); | |
// parse command-line parameters... | |
int opt; | |
while ( (opt = getopt(argc, argv, "h:p:t:k:c:f:")) != -1 ) { | |
switch (opt) { | |
case 'h': | |
snprintf (broker_host, sizeof(broker_host), "%s", optarg); | |
break; | |
case 'p': | |
broker_port = (uint16_t) atoi (optarg); | |
break; | |
case 't': | |
snprintf (topic, sizeof(topic), "%s", optarg); | |
break; | |
case 'k': | |
snprintf (keyfile, sizeof(keyfile), "%s", optarg); | |
break; | |
case 'c': | |
snprintf (certfile, sizeof(certfile), "%s", optarg); | |
break; | |
case 'f': | |
snprintf (cafile, sizeof(cafile), "%s", optarg); | |
break; | |
default: | |
fprintf (stderr, "Usage: %s [-h broker_host] [-p broker_port] [-t topic] [-k keyfile] [-c certfile] [-f cafile]\n", argv[0]); | |
return EXIT_FAILURE; | |
} | |
} | |
// gobble up all low fds so libmosquitto can't get any... | |
for (int i=0 ; i<FD_SETSIZE ; ++i) | |
dup (1); | |
struct mosquitto *mosq = mosquitto_new(client_id, true, NULL); | |
if (mosq == NULL) { | |
fprintf (stderr, "failed to allocate Mosquitto client: %d/%s\n", errno, strerror(errno)); | |
return EXIT_FAILURE; | |
} | |
if ( mosquitto_threaded_set (mosq, true) != MOSQ_ERR_SUCCESS ) { | |
fprintf (stderr, "mosquitto_threaded_set() failed for main MQTT client!\n" ); | |
} | |
if ( keyfile[0] || certfile[0] || cafile[0] ) { | |
fprintf (stderr, "initializing TLS...\nkey file: %s\ncert file: %s\nCA file: %s\n", keyfile, certfile, cafile); | |
int result = mosquitto_tls_set ( | |
mosq, | |
cafile, | |
NULL, | |
certfile[0] ? certfile : NULL, | |
keyfile[0] ? keyfile : NULL, | |
NULL ); | |
if ( result != MOSQ_ERR_SUCCESS ) { | |
fprintf (stderr, "failed to initialize TLS parameters for mosquitto client: %d (%d/%s)", result, errno, strerror(errno) ); | |
return EXIT_FAILURE; | |
} | |
} | |
mosquitto_connect_callback_set (mosq, on_connect); | |
mosquitto_message_callback_set (mosq, on_message); | |
fprintf (stderr, "connecting MQTT subscriber to %s:%"PRIu16"...\n", broker_host, broker_port); | |
int result = mosquitto_connect(mosq, broker_host, broker_port, MQTT_CONNECT_KEEPALIVE); | |
if (result != MOSQ_ERR_SUCCESS) { | |
fprintf (stderr, "connection failed: %d (%d/%s)\n", result, errno, strerror(errno)); | |
return EXIT_FAILURE; | |
} | |
signal (SIGINT, on_signal); | |
signal (SIGTERM, on_signal); | |
pthread_t thread; | |
pthread_create (&thread, NULL, (void*(*)(void*))&consumer_loop, mosq); | |
while (!should_stop) { | |
usleep (100000); | |
} | |
pthread_join (thread, NULL); | |
//consumer_loop(mosq); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment