Skip to content

Instantly share code, notes, and snippets.

@pvtom
Created January 8, 2022 15:21
Show Gist options
  • Save pvtom/1ad56c5c4c3d6d6f98347b8037938217 to your computer and use it in GitHub Desktop.
Save pvtom/1ad56c5c4c3d6d6f98347b8037938217 to your computer and use it in GitHub Desktop.
MQTT Subscription Client based on Mosquitto Library
/*
* Preparation: sudo apt-get install libmosquitto-dev
* Compilation: gcc mqttsub.c -o mqttsub -lmosquitto
* Usage: ./mqttsub --help
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#if !defined(__MACH__)
#include <malloc.h>
#endif
#include <sys/time.h>
#include <time.h>
#include <libgen.h>
#include <regex.h>
#include <mosquitto.h>
typedef struct _mqttattr {
int qos;
int mode;
int verbose;
int count;
int tlen;
char **topics;
int flen;
char **filters;
} mqttattr;
mqttattr create_mqttattr() {
mqttattr c;
c.qos = 0;
c.mode = 0;
c.verbose = 0;
c.count = 0;
c.tlen = 0;
c.topics = NULL;
c.flen = 0;
c.filters = NULL;
return(c);
}
int add_topic(mqttattr *mqtta, char *topic) {
if (mosquitto_sub_topic_check(topic) == MOSQ_ERR_SUCCESS) {
char **p;
p = (char**) realloc(mqtta->topics, (mqtta->tlen + 1) * sizeof(char*));
if (p != NULL) {
mqtta->topics = p;
mqtta->topics[mqtta->tlen] = topic;
mqtta->tlen++;
return(mqtta->tlen);
}
} else {
fprintf(stderr, "Warning: '%s' is not a valid topic\n", topic);
}
return(0);
}
int add_filter(mqttattr *mqtta, char *filter) {
char **p;
p = (char**) realloc(mqtta->filters, (mqtta->flen + 1) * sizeof(char*));
if (p != NULL) {
mqtta->filters = p;
mqtta->filters[mqtta->flen] = filter;
mqtta->flen++;
return(mqtta->flen);
}
return(0);
}
void destroy_mqttattr(mqttattr *mqtta) {
if (mqtta) {
if (mqtta->topics) free(mqtta->topics);
mqtta->tlen = 0;
if (mqtta->filters) free(mqtta->filters);
mqtta->flen = 0;
}
return;
}
char *now(char *ts) {
struct tm *timeinfo;
struct timeval tv;
gettimeofday(&tv, NULL);
timeinfo = localtime(&tv.tv_sec);
sprintf(ts, "%.4d%.2d%.2d%.2d%.2d%.2d.%.6d", timeinfo->tm_year+1900,timeinfo->tm_mon+1,timeinfo->tm_mday,timeinfo->tm_hour,timeinfo->tm_min,timeinfo->tm_sec,tv.tv_usec);
return(ts);
}
int regex_match(char *string, char *pattern) {
regex_t preg;
size_t nmatch = 1;
regmatch_t pmatch[nmatch];
if (regcomp(&preg, pattern, REG_EXTENDED|REG_NEWLINE)) {
return(0);
}
if (regexec(&preg, string, nmatch, pmatch, 0) == REG_NOMATCH) {
regfree(&preg);
return(0);
} else {
regfree(&preg);
return(1);
}
}
void connect_callback(struct mosquitto *mosq, void *obj, int result) {
char timestamp[24];
now(timestamp);
mqttattr *mqtta = obj;
if (!result) {
mosquitto_subscribe_multiple(mosq, NULL, mqtta->tlen, (char *const *const)mqtta->topics, mqtta->qos, 0, NULL);
if (mqtta->verbose) {
printf("[%s] MQTT broker connected.\n", timestamp);
int i;
for (i = 0; i < mqtta->tlen; i++) {
printf("[%s] topic '%s' subscribed.\n", timestamp, mqtta->topics[i]);
}
for (i = 0; i < mqtta->flen; i++) {
printf("[%s] topics matching with '%s' will be filtered out.\n", timestamp, mqtta->filters[i]);
}
}
}
return;
}
void message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
bool match = 0;
char timestamp[24];
mqttattr *mqtta = obj;
int i;
static int topic_width = 0;
static int match_width = 0;
now(timestamp);
for (i = 0; i < mqtta->flen; i++) {
if (regex_match(message->topic, mqtta->filters[i])) return;
}
for (i = 0; i < mqtta->tlen; i++) {
mosquitto_topic_matches_sub(mqtta->topics[i], message->topic, &match);
if (match) break;
}
if (mqtta->count > 0) mqtta->count = mqtta->count - 1;
switch (mqtta->mode) {
case 1:
if (topic_width < strlen(message->topic)) topic_width = strlen(message->topic);
if (match_width < strlen(mqtta->topics[i])) match_width = strlen(mqtta->topics[i]);
printf("[%s] %-*s %-*s %.*s\n", timestamp, match_width, mqtta->topics[i], topic_width, message->topic, message->payloadlen, (char*)message->payload);
break;
case 2:
printf("[%s] %s %s %.*s\n", timestamp, mqtta->topics[i], message->topic, message->payloadlen, (char*)message->payload);
break;
case 3:
printf("[%s] %s %.*s\n", timestamp, message->topic, message->payloadlen, (char*)message->payload);
break;
default:
printf("%s %.*s\n", message->topic, message->payloadlen, (char*)message->payload);
break;
}
return;
}
int main(int argc, char **argv) {
char *mqtt_host = NULL;
int mqtt_port = 1883;
char cid[20];
struct mosquitto *mosq;
int term = 0;
int rc = 0;
int i = 0;
mqttattr mqtta = create_mqttattr();
while (i < argc) {
if ((!strcmp(argv[i], "-h")) && (i+1 < argc)) mqtt_host = argv[++i];
if ((!strcmp(argv[i], "-p")) && (i+1 < argc)) mqtt_port = atoi(argv[++i]);
if ((!strcmp(argv[i], "-q")) && (i+1 < argc)) mqtta.qos = atoi(argv[++i]);
if ((!strcmp(argv[i], "-m")) && (i+1 < argc)) mqtta.mode = atoi(argv[++i]);
if ((!strcmp(argv[i], "-t")) && (i+1 < argc)) add_topic(&mqtta, argv[++i]);
if ((!strcmp(argv[i], "-f")) && (i+1 < argc)) add_filter(&mqtta, argv[++i]);
if ((!strcmp(argv[i], "-r")) && (i+1 < argc)) term = atoi(argv[++i]);
if ((!strcmp(argv[i], "-n")) && (i+1 < argc)) mqtta.count = atoi(argv[++i]);
if (!strcmp(argv[i], "-v")) mqtta.verbose = 1;
if (!strcmp(argv[i], "--help")) {
printf("MQTT subscription tool\nusage: %s -h <host> -p <port> -q <qos 0..2> -m <output mode 0..3> -t <topic 1> ... -t <topic n> -f <regex filter 1> ... -f <regex filter n> -r <runtime in seconds> -n <number of excepted messages> -v --help\n", basename(argv[0]));
return(0);
}
i++;
}
if (!mqtt_host) mqtt_host = "localhost";
if (!mqtta.tlen) add_topic(&mqtta, "#");
if ((mqtta.qos < 0) || (mqtta.qos > 2)) mqtta.qos = 0;
sprintf(cid, "mosqsub/%d", getpid());
mosquitto_lib_init();
mosq = mosquitto_new(cid, true, &mqtta);
if (mosq) {
mosquitto_connect_callback_set(mosq, connect_callback);
mosquitto_message_callback_set(mosq, message_callback);
if (mqtta.verbose) {
int major, minor, revision;
mosquitto_lib_version(&major, &minor, &revision);
printf("%s (libmosquitto %d.%d.%d)\n", basename(argv[0]), major, minor, revision);
printf("Connecting (%s) to %s:%d with qos=%d\n", cid, mqtt_host, mqtt_port, mqtta.qos);
}
rc = mosquitto_connect(mosq, mqtt_host, mqtt_port, 60);
if (!rc) {
if (term || mqtta.count) {
if (term && mqtta.verbose) printf("Looping for %d seconds\n", term);
if (mqtta.count && mqtta.verbose) printf("Looping until %d message(s) received\n", mqtta.count);
time_t start = time(NULL);
int c = !mqtta.count;
while((!term || (time(NULL) < start + term)) && (c || mqtta.count)) {
rc = mosquitto_loop(mosq, -1, 1);
if (rc) {
sleep(1);
mosquitto_reconnect(mosq);
}
}
} else {
if (mqtta.verbose) printf("Looping forever...\n");
rc = mosquitto_loop_forever(mosq, -1, 1);
}
} else {
fprintf(stderr, "Error: Could not connect to '%s:%d'\n", mqtt_host, mqtt_port);
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
}
mosquitto_lib_cleanup();
destroy_mqttattr(&mqtta);
return(rc);
}
@pvtom
Copy link
Author

pvtom commented Jan 8, 2022

Usage: mqttsub -h -p -q <qos 0..2> -m <output mode 0..3> -t <topic 1> ... -t -f <regex filter 1> ... -f -r -n -v --help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment