Skip to content

Instantly share code, notes, and snippets.

@jedi4ever
Created December 21, 2011 11:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jedi4ever/1505663 to your computer and use it in GitHub Desktop.
Save jedi4ever/1505663 to your computer and use it in GitHub Desktop.
extended nagios-zmq module (now does notifications too)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <json/json.h>
#include <pthread.h>
#include <uuid/uuid.h>
#include "include/zhelpers.h"
#include "include/nagios/objects.h"
#include "include/nagios/nagios.h"
#include "include/nagios/nebstructs.h"
#include "include/nagios/broker.h"
#include "include/nagios/nebmodules.h"
#include "include/nagios/nebcallbacks.h"
#define DEFAULT_ZMQ_IN_PORT 5555
#define DEFAULT_ZMQ_OUT_PORT 6666
#define LG_INFO 262144
#define LG_WARN LOG_INFO
#define LG_ERR LOG_INFO
#define LG_CRIT LOG_INFO
#define LG_DEBUG LOG_INFO
#define LG_ALERT LOG_INFO
#define MAX_MESSAGE 1024*1024
void *nagios_zmq_module_handle = NULL;
void *g_context;
void *g_publisher;
int g_zmq_in_port = DEFAULT_ZMQ_IN_PORT;
int g_zmq_out_port = DEFAULT_ZMQ_OUT_PORT;
pthread_t g_zmq_forwarder_thread;
pthread_t g_zmq_publisher_thread;
NEB_API_VERSION(CURRENT_NEB_API_VERSION)
void logger2(int priority, const char *loginfo, ...)
{
char buffer[8192];
snprintf(buffer, 20, "zmq queue: ");
va_list ap;
va_start(ap, loginfo);
vsnprintf(buffer + strlen(buffer), sizeof(buffer) - strlen(buffer), loginfo, ap);
va_end(ap);
write_to_all_logs(buffer, priority);
}
void parse_arguments(const char *args) {
char arguments[1024];
char *arg_term;
// no arguments
if(!args) return;
strncpy(arguments, args, 1024);
arg_term = strtok(arguments, " =");
while(arg_term != NULL) {
char *key, *value;
key = arg_term;
arg_term = strtok(NULL, " =");
value = arg_term;
if(!strncmp(key, "inport", 4)) {
g_zmq_in_port = atoi(value);
}
if(!strncmp(key, "outport", 4)) {
g_zmq_out_port = atoi(value);
}
arg_term = strtok(NULL, " =");
}
}
void *zmq_publisher_start() {
logger2(LG_INFO, "start zmq publisher.");
// init socket
g_publisher = zmq_socket(g_context, ZMQ_PUB);
zmq_connect(g_publisher, "tcp://localhost:5555");
}
void *zmq_forwarder_start() {
logger2(LG_INFO, "start zmq forwarder.");
// Socket facing clients
void *incoming = zmq_socket(g_context, ZMQ_SUB);
zmq_bind(incoming, "tcp://*:5555");
zmq_setsockopt(incoming, ZMQ_SUBSCRIBE, "", 0);
// Socket facing services
void *outgoing = zmq_socket(g_context, ZMQ_PUB);
zmq_bind(outgoing, "tcp://*:6666");
// Start built-in device
zmq_device(ZMQ_FORWARDER, incoming, outgoing);
// We never get here…
zmq_close(incoming);
zmq_close(outgoing);
zmq_term(g_context);
}
void start_threads2() {
// init zmq context
g_context = zmq_init(1);
// start forwarder thread
pthread_create(&g_zmq_forwarder_thread, 0, zmq_forwarder_start, (void *)0);
pthread_detach(g_zmq_forwarder_thread);
// start publisher thread
pthread_create(&g_zmq_publisher_thread, 0, zmq_publisher_start, (void *)0);
pthread_detach(g_zmq_publisher_thread);
}
json_object * json_add_pair(json_object *jobj, char *key, char *value) {
json_object *jstring = json_object_new_string(value);
json_object_object_add(jobj, key, jstring);
return jobj;
}
/* Generate an UUID string. */
char * create_uuid() {
char *string = malloc(37);
uuid_t uuid;
uuid_generate(uuid);
uuid_unparse(uuid, string);
return string;
}
int send_servicecheck2(nebstruct_service_check_data *check_data) {
time_t ts = time(NULL);
char message_buffer[MAX_MESSAGE];
char cast_buffer[1024];
char *uuid = create_uuid();
json_object * jevent = json_object_new_object();
sprintf(cast_buffer, "%s", uuid);
json_add_pair(jevent, "id", cast_buffer);
free(uuid);
json_add_pair(jevent, "context", "SERVICECHECK");
json_add_pair(jevent, "source", "NAGIOS");
sprintf(cast_buffer, "%i", (int)ts);
json_add_pair(jevent, "timestamp", cast_buffer);
json_object * jobj = json_object_new_object();
sprintf(cast_buffer, "%i", check_data->current_attempt);
json_add_pair(jobj, "current_attempt", cast_buffer);
sprintf(cast_buffer, "%i", check_data->max_attempts);
json_add_pair(jobj, "max_attempts", cast_buffer);
sprintf(cast_buffer, "%i", check_data->state_type);
json_add_pair(jobj, "state_type", cast_buffer);
sprintf(cast_buffer, "%i", check_data->state);
json_add_pair(jobj, "state", cast_buffer);
sprintf(cast_buffer, "%ld", check_data->timestamp.tv_sec);
json_add_pair(jobj, "timestamp", cast_buffer);
sprintf(cast_buffer, "%f", check_data->execution_time);
json_add_pair(jobj, "execution_time", cast_buffer);
json_add_pair(jobj, "hostname", check_data->host_name);
json_add_pair(jobj, "service", check_data->service_description);
json_add_pair(jobj, "output", check_data->output);
if(check_data->perf_data)
json_add_pair(jobj, "performance", check_data->perf_data);
json_object_object_add(jevent, "payload", jobj);
logger2(LG_INFO, "'%s'\n", json_object_to_json_string(jevent));
sprintf(message_buffer, "%s", json_object_to_json_string(jevent));
s_send(g_publisher, message_buffer);
json_object_put(jevent);
json_object_put(jobj);
return 0;
}
int send_notification2(nebstruct_notification_data *notification_data) {
time_t ts = time(NULL);
char message_buffer[MAX_MESSAGE];
char cast_buffer[1024];
char *uuid = create_uuid();
json_object * jevent = json_object_new_object();
logger2(LG_INFO, "1");
sprintf(cast_buffer, "%s", uuid);
json_add_pair(jevent, "id", cast_buffer);
free(uuid);
logger2(LG_INFO, "2");
json_add_pair(jevent, "context", "NOTIFICATION");
json_add_pair(jevent, "source", "NAGIOS");
sprintf(cast_buffer, "%i", (int)ts);
json_add_pair(jevent, "timestamp", cast_buffer);
json_object * jobj = json_object_new_object();
logger2(LG_INFO, "3");
json_add_pair(jobj, "hostname", notification_data->host_name);
logger2(LG_INFO, "4");
sprintf(cast_buffer, "%ld", notification_data->start_time.tv_sec);
json_add_pair(jobj, "start_time", cast_buffer);
sprintf(cast_buffer, "%ld", notification_data->end_time.tv_sec);
json_add_pair(jobj, "end_time", cast_buffer);
logger2(LG_INFO, "5");
json_add_pair(jobj, "service_description", notification_data->service_description);
logger2(LG_INFO, "6");
sprintf(cast_buffer, "%i", notification_data->reason_type);
json_add_pair(jobj, "reason_type", cast_buffer);
logger2(LG_INFO, "7");
sprintf(cast_buffer, "%i", notification_data->state);
json_add_pair(jobj, "state", cast_buffer);
logger2(LG_INFO, "8");
json_add_pair(jobj, "output", notification_data->output);
logger2(LG_INFO, "8");
/*
json_add_pair(jobj, "ack_author", notification_data->ack_author);
logger2(LG_INFO, "8");
json_add_pair(jobj, "ack_data", notification_data->ack_data);
logger2(LG_INFO, "8");
sprintf(cast_buffer, "%i", notification_data->escalated);
json_add_pair(jobj, "escalated", cast_buffer);
logger2(LG_INFO, "8");
sprintf(cast_buffer, "%i", notification_data->contacts_notified);
json_add_pair(jobj, "contacts_notified", cast_buffer);
logger2(LG_INFO, "8");
*/
json_object_object_add(jevent, "payload", jobj);
logger2(LG_INFO, "'%s'\n", json_object_to_json_string(jevent));
sprintf(message_buffer, "%s", json_object_to_json_string(jevent));
s_send(g_publisher, message_buffer);
json_object_put(jevent);
json_object_put(jobj);
return 0;
}
int send_hostcheck2(nebstruct_host_check_data *check_data) {
time_t ts = time(NULL);
char message_buffer[MAX_MESSAGE];
char cast_buffer[1024];
char *uuid = create_uuid();
json_object * jevent = json_object_new_object();
sprintf(cast_buffer, "%s", uuid);
json_add_pair(jevent, "id", cast_buffer);
free(uuid);
json_add_pair(jevent, "context", "HOSTCHECK");
json_add_pair(jevent, "source", "NAGIOS");
sprintf(cast_buffer, "%i", (int)ts);
json_add_pair(jevent, "timestamp", cast_buffer);
json_object * jobj = json_object_new_object();
sprintf(cast_buffer, "%i", check_data->current_attempt);
json_add_pair(jobj, "current_attempt", cast_buffer);
sprintf(cast_buffer, "%i", check_data->max_attempts);
json_add_pair(jobj, "max_attempts", cast_buffer);
sprintf(cast_buffer, "%i", check_data->state_type);
json_add_pair(jobj, "state_type", cast_buffer);
sprintf(cast_buffer, "%i", check_data->state);
json_add_pair(jobj, "state", cast_buffer);
sprintf(cast_buffer, "%ld", check_data->timestamp.tv_sec);
json_add_pair(jobj, "timestamp", cast_buffer);
sprintf(cast_buffer, "%f", check_data->execution_time);
json_add_pair(jobj, "execution_time", cast_buffer);
json_add_pair(jobj, "hostname", check_data->host_name);
json_add_pair(jobj, "output", check_data->output);
if(check_data->perf_data)
json_add_pair(jobj, "performance", check_data->perf_data);
json_object_object_add(jevent, "payload", jobj);
logger2(LG_INFO, "'%s'\n", json_object_to_json_string(jevent));
sprintf(message_buffer, "%s", json_object_to_json_string(jevent));
s_send(g_publisher, message_buffer);
json_object_put(jevent);
json_object_put(jobj);
return 0;
}
int broker_check2(int event_type, void *data) {
logger2(LG_INFO, "broker check - zmq");
if (event_type == NEBCALLBACK_SERVICE_CHECK_DATA) {
nebstruct_service_check_data *c = (nebstruct_service_check_data *)data;
if (c->type == NEBTYPE_SERVICECHECK_PROCESSED) {
logger2(LG_INFO, "we got service check - zmq");
send_servicecheck2(c);
}
} else if (event_type == NEBCALLBACK_HOST_CHECK_DATA) {
nebstruct_host_check_data *c = (nebstruct_host_check_data *)data;
if (c->type == NEBTYPE_HOSTCHECK_PROCESSED) {
logger2(LG_INFO, "we got host check - zmq");
send_hostcheck2(c);
}
}
return 0;
}
int broker_state2(int event_type __attribute__ ((__unused__)), void *data __attribute__ ((__unused__))) {
return 0;
}
int broker_process2(int event_type __attribute__ ((__unused__)), void *data) {
logger2(LG_INFO, "debug 1234");
struct nebstruct_process_struct *ps = (struct nebstruct_process_struct *)data;
logger2(LG_INFO, "about to start threads if correct event");
if (ps->type == NEBTYPE_PROCESS_EVENTLOOPSTART) {
logger2(LG_INFO, "starting threads");
start_threads2();
}
return 0;
}
int broker_notification2(int event_type __attribute__ ((__unused__)), void *data) {
logger2(LG_INFO, "got notification");
nebstruct_notification_data *d = (nebstruct_notification_data *)data;
send_notification2(d);
return 0;
}
void register_callbacks2() {
neb_register_callback(NEBCALLBACK_STATE_CHANGE_DATA, nagios_zmq_module_handle, 2, broker_state2);
neb_register_callback(NEBCALLBACK_SERVICE_CHECK_DATA, nagios_zmq_module_handle, 2, broker_check2);
neb_register_callback(NEBCALLBACK_HOST_CHECK_DATA, nagios_zmq_module_handle, 2, broker_check2);
neb_register_callback(NEBCALLBACK_NOTIFICATION_DATA, nagios_zmq_module_handle, 2, broker_notification2);
// used for starting threads
neb_register_callback(NEBCALLBACK_PROCESS_DATA, nagios_zmq_module_handle, 2, broker_process2);
}
void deregister_callbacks2() {
neb_deregister_callback(NEBCALLBACK_STATE_CHANGE_DATA, broker_state2);
neb_deregister_callback(NEBCALLBACK_SERVICE_CHECK_DATA, broker_check2);
neb_deregister_callback(NEBCALLBACK_HOST_CHECK_DATA, broker_check2);
neb_deregister_callback(NEBCALLBACK_NOTIFICATION_DATA, broker_notification2);
neb_deregister_callback(NEBCALLBACK_PROCESS_DATA, broker_process2);
}
/* this function gets called when the module is loaded by the event broker */
int nebmodule_init(int flags __attribute__ ((__unused__)), char *args, void *handle) {
nagios_zmq_module_handle = handle;
logger2(LG_INFO, "nagios-zmq by Marius Sturm");
parse_arguments(args);
//register_callbacks();
neb_register_callback(NEBCALLBACK_STATE_CHANGE_DATA, nagios_zmq_module_handle, 0, broker_state2);
neb_register_callback(NEBCALLBACK_SERVICE_CHECK_DATA, nagios_zmq_module_handle, 0, broker_check2);
neb_register_callback(NEBCALLBACK_HOST_CHECK_DATA, nagios_zmq_module_handle, 0, broker_check2);
neb_register_callback(NEBCALLBACK_NOTIFICATION_DATA, nagios_zmq_module_handle, 0, broker_notification2);
neb_register_callback(NEBCALLBACK_PROCESS_DATA, nagios_zmq_module_handle, 0, broker_process2);
logger2(LG_INFO, "successfully finished initialization");
}
int nebmodule_deinit(int flags __attribute__ ((__unused__)), int reason __attribute__ ((__unused__))) {
logger2(LG_INFO, "deinitializing");
deregister_callbacks2();
//deinit zmq
zmq_close(g_publisher);
zmq_term(g_context);
return 0;
}
[1324465955] zmq queue: '{ "id": "8c06b0a5-4540-444d-b883-a309be562a7d", "context": "NOTIFICATION", "source": "NAGIOS", "timestamp": "1324465955", "payload": { "hostname": "localhost", "start_time": "1324465955", "end_time": "1324465955", "service_description": "Swap Usage", "reason_type": "0", "state": "2", "output": "SWAP CRITICAL - 100% free (0 MB out of 0 MB)" } }'
[1324465955] zmq queue: we got service check - zmq
[1324465955] zmq queue: '{ "id": "aecf8a27-3fe1-4b63-88cd-5ee7e40ab2af", "context": "SERVICECHECK", "source": "NAGIOS", "timestamp": "1324465955", "payload": { "current_attempt": "1", "max_attempts": "1", "state_type": "1", "state": "2", "timestamp": "1324465955", "execution_time": "0.012393", "hostname": "localhost" , "service": "Swap Usage", "output": "SWAP CRITICAL - 100% free (0 MB out of 0 MB)", "performance": "swap= 0MB;0;0;0;0" } }'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment