Last active
July 30, 2019 14:21
-
-
Save jpmens/10820081 to your computer and use it in GitHub Desktop.
Send MQTT payloads (incl JSON) as UDP GELF (compressed) to Graylog2
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <errno.h> | |
#include <fcntl.h> | |
#include <stdio.h> | |
#include <stdlib.h> | |
#include <string.h> | |
#include <unistd.h> | |
#include <signal.h> | |
#include <arpa/inet.h> | |
#include <netinet/in.h> | |
#include <netdb.h> | |
#include <sys/socket.h> | |
#include <sys/time.h> | |
#include <ctype.h> | |
#include <mosquitto.h> | |
#include <json/json.h> | |
#include <zlib.h> | |
/* | |
* Jan-Piet Mens, April 2014 | |
* Listen to any number of MQTT topics and send payloads of received | |
* messages to Graylog2 (UDP GELF). | |
* If the MQTT payload contains JSON, attempt to extract the elements into the | |
* GELF payload. | |
* | |
* cc -Wall -Werror sub2gelf.c -o sub2gelf -lmosquitto -ljson-c -lz | |
*/ | |
#define GRAYLOG_HOST "ubu.prox" | |
#define GELF_PORT "12201" | |
/* 0=Emerg, 1=Alert, 2=Crit, 3=Error, 4=Warn, 5=Notice, 6=Info */ | |
#define LEVEL 6 | |
#define FACILITY "jp-sender" | |
struct userdata { | |
int udpsock; | |
struct sockaddr *sa; | |
socklen_t salen; | |
}; | |
static int run = 1; | |
static int verbose = 1; | |
void sendgelf(struct userdata *ud, const char *payload, int payloadlen) | |
{ | |
int rc; | |
rc = sendto(ud->udpsock, payload, payloadlen, 0, ud->sa, ud->salen); | |
if (rc < 0) { | |
perror("sendto"); | |
} | |
} | |
struct zbuf { | |
int size; | |
void *data; | |
}; | |
void z_compress(struct zbuf *zbuf, void *buf, int buflen) | |
{ | |
z_stream st; | |
int ret; | |
st.zalloc = Z_NULL; | |
st.zfree = Z_NULL; | |
st.opaque = Z_NULL; | |
st.data_type = Z_TEXT; | |
if ((ret = deflateInit(&st, Z_DEFAULT_COMPRESSION)) != Z_OK) { | |
fprintf(stderr, "Cannot init deflate\n"); | |
return; | |
} | |
if ((zbuf->data = malloc(buflen)) == NULL) { | |
fprintf(stderr, "ENOMEM\n"); | |
return; | |
} | |
st.avail_in = buflen; | |
st.next_in = buf; | |
st.next_out = zbuf->data; | |
st.avail_out = buflen; | |
if (deflate(&st, Z_FINISH) == Z_STREAM_ERROR) { | |
fprintf(stderr, "Failure in defalte compression\n"); | |
return; | |
} | |
zbuf->size = buflen - st.avail_out; | |
if (verbose) { | |
fprintf(stderr, "Compression: %d -> %d\n", buflen, zbuf->size); | |
} | |
deflateEnd(&st); | |
} | |
double gettimestamp() { | |
struct timeval tv; | |
gettimeofday(&tv, NULL); | |
double ret = tv.tv_sec; | |
double msec = ((double) (tv.tv_usec / 1000)) / 1000.0; | |
ret += msec; | |
return ret; | |
} | |
/* | |
* Create a GELF (JSON) record from `line'. If `line' contains | |
* JSON itself, parse that and add individual elements to the JSON. | |
*/ | |
char *JSONify(char *line, char *topic) | |
{ | |
struct json_object *parsed = NULL; | |
char *json_str = NULL; | |
const char *s; | |
if (!line || !*line) | |
return (NULL); | |
json_object *root = json_object_new_object(); | |
json_object *version = json_object_new_string("1.1"); | |
json_object *host = json_object_new_string("mqtt2gelf"); | |
json_object *shortmessage = json_object_new_string(line); | |
json_object *fac = json_object_new_string(FACILITY); | |
json_object *level = json_object_new_int(LEVEL); | |
json_object *timestamp = json_object_new_double(gettimestamp()); | |
/* Attach elements to root object */ | |
json_object_object_add(root, "timestamp", timestamp); | |
json_object_object_add(root, "host", host); | |
json_object_object_add(root, "short_message", shortmessage); | |
json_object_object_add(root, "facility", fac); | |
json_object_object_add(root, "level", level); | |
json_object_object_add(root, "version", version); | |
/* MQTT elements */ | |
json_object_object_add(root, "topic", json_object_new_string(topic)); | |
/* JPM */ | |
json_object_object_add(root, "jp_int", json_object_new_string("JP.Mens")); | |
/* | |
* Bug in json-c 0.11 aparently: if the string starts with a digit, | |
* _parse dumps core. | |
*/ | |
if (!isdigit(*line) && (parsed = json_tokener_parse(line)) != NULL) { | |
// enum json_type type; | |
json_object_object_foreach(parsed, key, val) { | |
json_object_object_add(root, key, val); | |
/* | |
* switch (type) { case json_type_string: | |
* json_object_object_add(object, key, val); | |
* break; } | |
*/ | |
} | |
} | |
s = json_object_to_json_string_ext(root, verbose ? | |
JSON_C_TO_STRING_PRETTY | JSON_C_TO_STRING_SPACED : | |
JSON_C_TO_STRING_PLAIN); | |
if (s && *s) { | |
json_str = strdup(s); | |
} | |
json_object_put(root); | |
if (parsed != NULL) { | |
json_object_put(parsed); | |
} | |
return (json_str); | |
} | |
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m) | |
{ | |
/* | |
* mosquitto_message-> | |
* int mid; | |
* char *topic; | |
* void *payload; | |
* int payloadlen; | |
* int qos; | |
* bool retain; | |
*/ | |
struct userdata *ud = (struct userdata *)userdata; | |
char *jsonstr; | |
struct zbuf zbuf; | |
if (m->retain || (m->payloadlen < 1)) | |
return; | |
/* FIXME: what to do with binary payloads? */ | |
printf("%s %s\n", m->topic, m->payload); | |
if ((jsonstr = JSONify(m->payload, m->topic)) == NULL) { | |
return; | |
} | |
if (verbose) { | |
printf("%s\n", jsonstr); | |
} | |
zbuf.size = -1; | |
z_compress(&zbuf, jsonstr, strlen(jsonstr)); | |
if (zbuf.size == -1) { | |
fprintf(stderr, "Cannot send compressed; falling back to uncompressed\n"); | |
sendgelf(ud, jsonstr, strlen(jsonstr)); | |
} else { | |
sendgelf(ud, zbuf.data, zbuf.size); | |
free(zbuf.data); | |
} | |
free(jsonstr); | |
} | |
static void catcher(int signo) | |
{ | |
run = 0; | |
} | |
void udp_connect(struct userdata *ud, char *host, char *port) | |
{ | |
struct addrinfo hints, *res, *ressave; | |
int n; | |
bzero(&hints, sizeof(struct addrinfo)); | |
hints.ai_family = AF_UNSPEC; | |
hints.ai_socktype = SOCK_DGRAM; | |
hints.ai_protocol = IPPROTO_UDP; | |
if ((n=getaddrinfo(host, port, &hints, &res)) != 0) { | |
fprintf(stderr, "UDP error for %s, %s: %s", host, port, gai_strerror(n)); | |
exit(-1); | |
} | |
ressave=res; | |
do { | |
ud->udpsock = socket(res->ai_family, res->ai_socktype, res->ai_protocol); | |
if (ud->udpsock > 0) { | |
break; | |
} | |
} while ((res = res->ai_next) != NULL); | |
ud->sa = malloc(res->ai_addrlen); | |
memcpy(ud->sa, res->ai_addr, res->ai_addrlen); | |
ud->salen = res->ai_addrlen; | |
freeaddrinfo(ressave); | |
} | |
int main(int argc, char **argv) | |
{ | |
struct mosquitto *mosq = NULL; | |
char err[1024], clientid[24]; | |
int rc, mid; | |
struct userdata userdata; | |
udp_connect(&userdata, GRAYLOG_HOST, GELF_PORT); | |
mosquitto_lib_init(); | |
memset(clientid, 0, 24); | |
snprintf(clientid, 23, "subber_%d", getpid()); | |
signal(SIGINT, catcher); | |
mosq = mosquitto_new(clientid, true, &userdata); | |
if (!mosq) { | |
fprintf(stderr, "Error: Out of memory.\n"); | |
mosquitto_lib_cleanup(); | |
return 1; | |
} | |
mosquitto_message_callback_set(mosq, on_message); | |
rc = mosquitto_connect(mosq, "127.0.0.1", 1883, 60); | |
if (rc) { | |
if (rc == MOSQ_ERR_ERRNO) { | |
strerror_r(errno, err, 1024); | |
fprintf(stderr, "Error: %s\n", err); | |
} else { | |
fprintf(stderr, "Unable to connect (%d).\n", rc); | |
} | |
mosquitto_lib_cleanup(); | |
return rc; | |
} | |
mosquitto_subscribe(mosq, &mid, "gelf/#", 0); | |
mosquitto_subscribe(mosq, &mid, "#", 0); | |
while (run) { | |
rc = mosquitto_loop(mosq, -1, 1); | |
if (run && rc) { | |
sleep(20); | |
mosquitto_reconnect(mosq); | |
} | |
} | |
mosquitto_disconnect(mosq); | |
mosquitto_destroy(mosq); | |
mosquitto_lib_cleanup(); | |
return 0; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment