Skip to content

Instantly share code, notes, and snippets.

@jpmens
Last active July 30, 2019 14:21
Show Gist options
  • Save jpmens/10820081 to your computer and use it in GitHub Desktop.
Save jpmens/10820081 to your computer and use it in GitHub Desktop.
Send MQTT payloads (incl JSON) as UDP GELF (compressed) to Graylog2
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <ctype.h>
#include <mosquitto.h>
#include <json/json.h>
#include <zlib.h>
/*
* Jan-Piet Mens, April 2014
* Listen to any number of MQTT topics and send payloads of received
* messages to Graylog2 (UDP GELF).
* If the MQTT payload contains JSON, attempt to extract the elements into the
* GELF payload.
*
* cc -Wall -Werror sub2gelf.c -o sub2gelf -lmosquitto -ljson-c -lz
*/
#define GRAYLOG_HOST "ubu.prox"
#define GELF_PORT "12201"
/* 0=Emerg, 1=Alert, 2=Crit, 3=Error, 4=Warn, 5=Notice, 6=Info */
#define LEVEL 6
#define FACILITY "jp-sender"
struct userdata {
int udpsock;
struct sockaddr *sa;
socklen_t salen;
};
static int run = 1;
static int verbose = 1;
void sendgelf(struct userdata *ud, const char *payload, int payloadlen)
{
int rc;
rc = sendto(ud->udpsock, payload, payloadlen, 0, ud->sa, ud->salen);
if (rc < 0) {
perror("sendto");
}
}
struct zbuf {
int size;
void *data;
};
void z_compress(struct zbuf *zbuf, void *buf, int buflen)
{
z_stream st;
int ret;
st.zalloc = Z_NULL;
st.zfree = Z_NULL;
st.opaque = Z_NULL;
st.data_type = Z_TEXT;
if ((ret = deflateInit(&st, Z_DEFAULT_COMPRESSION)) != Z_OK) {
fprintf(stderr, "Cannot init deflate\n");
return;
}
if ((zbuf->data = malloc(buflen)) == NULL) {
fprintf(stderr, "ENOMEM\n");
return;
}
st.avail_in = buflen;
st.next_in = buf;
st.next_out = zbuf->data;
st.avail_out = buflen;
if (deflate(&st, Z_FINISH) == Z_STREAM_ERROR) {
fprintf(stderr, "Failure in defalte compression\n");
return;
}
zbuf->size = buflen - st.avail_out;
if (verbose) {
fprintf(stderr, "Compression: %d -> %d\n", buflen, zbuf->size);
}
deflateEnd(&st);
}
double gettimestamp() {
struct timeval tv;
gettimeofday(&tv, NULL);
double ret = tv.tv_sec;
double msec = ((double) (tv.tv_usec / 1000)) / 1000.0;
ret += msec;
return ret;
}
/*
* Create a GELF (JSON) record from `line'. If `line' contains
* JSON itself, parse that and add individual elements to the JSON.
*/
char *JSONify(char *line, char *topic)
{
struct json_object *parsed = NULL;
char *json_str = NULL;
const char *s;
if (!line || !*line)
return (NULL);
json_object *root = json_object_new_object();
json_object *version = json_object_new_string("1.1");
json_object *host = json_object_new_string("mqtt2gelf");
json_object *shortmessage = json_object_new_string(line);
json_object *fac = json_object_new_string(FACILITY);
json_object *level = json_object_new_int(LEVEL);
json_object *timestamp = json_object_new_double(gettimestamp());
/* Attach elements to root object */
json_object_object_add(root, "timestamp", timestamp);
json_object_object_add(root, "host", host);
json_object_object_add(root, "short_message", shortmessage);
json_object_object_add(root, "facility", fac);
json_object_object_add(root, "level", level);
json_object_object_add(root, "version", version);
/* MQTT elements */
json_object_object_add(root, "topic", json_object_new_string(topic));
/* JPM */
json_object_object_add(root, "jp_int", json_object_new_string("JP.Mens"));
/*
* Bug in json-c 0.11 aparently: if the string starts with a digit,
* _parse dumps core.
*/
if (!isdigit(*line) && (parsed = json_tokener_parse(line)) != NULL) {
// enum json_type type;
json_object_object_foreach(parsed, key, val) {
json_object_object_add(root, key, val);
/*
* switch (type) { case json_type_string:
* json_object_object_add(object, key, val);
* break; }
*/
}
}
s = json_object_to_json_string_ext(root, verbose ?
JSON_C_TO_STRING_PRETTY | JSON_C_TO_STRING_SPACED :
JSON_C_TO_STRING_PLAIN);
if (s && *s) {
json_str = strdup(s);
}
json_object_put(root);
if (parsed != NULL) {
json_object_put(parsed);
}
return (json_str);
}
void on_message(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *m)
{
/*
* mosquitto_message->
* int mid;
* char *topic;
* void *payload;
* int payloadlen;
* int qos;
* bool retain;
*/
struct userdata *ud = (struct userdata *)userdata;
char *jsonstr;
struct zbuf zbuf;
if (m->retain || (m->payloadlen < 1))
return;
/* FIXME: what to do with binary payloads? */
printf("%s %s\n", m->topic, m->payload);
if ((jsonstr = JSONify(m->payload, m->topic)) == NULL) {
return;
}
if (verbose) {
printf("%s\n", jsonstr);
}
zbuf.size = -1;
z_compress(&zbuf, jsonstr, strlen(jsonstr));
if (zbuf.size == -1) {
fprintf(stderr, "Cannot send compressed; falling back to uncompressed\n");
sendgelf(ud, jsonstr, strlen(jsonstr));
} else {
sendgelf(ud, zbuf.data, zbuf.size);
free(zbuf.data);
}
free(jsonstr);
}
static void catcher(int signo)
{
run = 0;
}
void udp_connect(struct userdata *ud, char *host, char *port)
{
struct addrinfo hints, *res, *ressave;
int n;
bzero(&hints, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = IPPROTO_UDP;
if ((n=getaddrinfo(host, port, &hints, &res)) != 0) {
fprintf(stderr, "UDP error for %s, %s: %s", host, port, gai_strerror(n));
exit(-1);
}
ressave=res;
do {
ud->udpsock = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
if (ud->udpsock > 0) {
break;
}
} while ((res = res->ai_next) != NULL);
ud->sa = malloc(res->ai_addrlen);
memcpy(ud->sa, res->ai_addr, res->ai_addrlen);
ud->salen = res->ai_addrlen;
freeaddrinfo(ressave);
}
int main(int argc, char **argv)
{
struct mosquitto *mosq = NULL;
char err[1024], clientid[24];
int rc, mid;
struct userdata userdata;
udp_connect(&userdata, GRAYLOG_HOST, GELF_PORT);
mosquitto_lib_init();
memset(clientid, 0, 24);
snprintf(clientid, 23, "subber_%d", getpid());
signal(SIGINT, catcher);
mosq = mosquitto_new(clientid, true, &userdata);
if (!mosq) {
fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
mosquitto_message_callback_set(mosq, on_message);
rc = mosquitto_connect(mosq, "127.0.0.1", 1883, 60);
if (rc) {
if (rc == MOSQ_ERR_ERRNO) {
strerror_r(errno, err, 1024);
fprintf(stderr, "Error: %s\n", err);
} else {
fprintf(stderr, "Unable to connect (%d).\n", rc);
}
mosquitto_lib_cleanup();
return rc;
}
mosquitto_subscribe(mosq, &mid, "gelf/#", 0);
mosquitto_subscribe(mosq, &mid, "#", 0);
while (run) {
rc = mosquitto_loop(mosq, -1, 1);
if (run && rc) {
sleep(20);
mosquitto_reconnect(mosq);
}
}
mosquitto_disconnect(mosq);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment