Skip to content

Instantly share code, notes, and snippets.

@jgoldschrafe
Created April 20, 2013 19:12
Show Gist options
  • Save jgoldschrafe/5427034 to your computer and use it in GitHub Desktop.
Save jgoldschrafe/5427034 to your computer and use it in GitHub Desktop.
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::hs6bms HOSTPERFDATA::rta=13.191ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::risb116-2950 HOSTPERFDATA::rta=2.096ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::roof-2960g HOSTPERFDATA::rta=0.679ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::rose-2950-1 HOSTPERFDATA::rta=1.534ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping
DATATYPE::HOSTPERFDATA TIMET::1365442741 HOSTNAME::sambrookmodbus HOSTPERFDATA::rta=1.914ms;300.000;500.000;0; pl=0%;20;60;; HOSTCHECKCOMMAND::check_host_alive HOSTSTATE::UP HOSTSTATETYPE::HARD GRAPHITEPREFIX::nagios GRAPHITEPOSTFIX::host-ping
#!/usr/bin/env python
#
# Metricinga - A gevent-based performance data forwarder for Nagios/Icinga
#
# Author: Jeff Goldschrafe <jeff@holyhandgrenade.org>
import lockfile
import os
import cPickle as pickle
from pprint import pformat, pprint
import re
import signal
import socket
import struct
import sys
import time
from daemon import DaemonContext
import gevent
from gevent import Greenlet, Timeout
import gevent.monkey
from gevent.queue import PriorityQueue
import logging
import logging.handlers
from optparse import OptionGroup, OptionParser
try:
import gevent_inotifyx as inotify
use_inotify = True
except ImportError, ex:
use_inotify = False
gevent.monkey.patch_all()
#
# Utility classes
#
class Metric(object):
"""Represents a single datapoint of a system metric.
"""
def __init__(self, path=[], value=0, timestamp=0, source=None):
self.path = path
self.value = value
self.timestamp = timestamp
self.source = source
class PurgedFileFactory(object):
"""Manage state of PurgedFileToken instances.
Singleton-like factory to ensure file paths are not shared between
PurgedFileToken instances.
"""
instances = {}
@staticmethod
def create(path):
if PurgedFileFactory.instances.get(path):
return None
else:
PurgedFileFactory.instances[path] = True
return PurgedFileToken(path)
@staticmethod
def destroy(path):
if path in PurgedFileFactory.instances:
del PurgedFileFactory.instances[path]
class PurgedFileToken(object):
"""Deletes a file when the last reference to the token leaves scope.
"""
def __init__(self, path):
self.path = path
def __del__(self):
log.debug("Unlinking file `{0}'".format(self.path))
os.remove(self.path)
PurgedFileFactory.destroy(self.path)
class SourcedString(object):
"""Pairs a string with the PurgedFileToken it originated from.
Allows the original source to be purged when all references to its
data have been removed from scope.
"""
def __init__(self, string_, source):
self.string_ = string_
self.source = source
#
# Message encapsulation classes
#
class ShutdownRequest(object):
pass
class ParseFileRequest(object):
def __init__(self, path):
self.path = path
class ParseLineRequest(object):
def __init__(self, line):
self.line = line
class PublishMetricRequest(object):
def __init__(self, metric):
self.metric = metric
#
# Decorators
#
class event(object):
def __init__(self, func):
self.__doc__ = func.__doc__
self._key = ' ' + func.__name__
def __get__(self, obj, cls):
try:
return obj.__dict__[self._key]
except KeyError, exc:
be = obj.__dict__[self._key] = boundevent()
return be
class boundevent(object):
def __init__(self):
self._fns = []
def __call__(self, *args, **kwargs):
for f in self._fns:
f(*args, **kwargs)
def subscribe(self, fn):
self._fns.append(fn)
def unsubscribe(self, fn):
self._fns.remove(fn)
#
# Greenlet classes
#
class Actor(Greenlet):
"""Simple implementation of the Actor pattern.
"""
def __init__(self):
self.inbox = PriorityQueue()
self._handlers = {ShutdownRequest: self.receive_shutdown}
Greenlet.__init__(self)
def receive(self, msg):
"""Dispatch a received message to the appropriate type handler.
"""
#log.debug("Received a message: " + repr(msg))
cls = msg.__class__
if cls in self._handlers.keys():
self._handlers[cls](msg)
else:
raise NotImplemented()
def receive_shutdown(self, msg):
self.running = False
def send(self, msg, priority=50):
"""Place a message into the actor's inbox.
"""
self.inbox.put((priority, msg))
def _run(self):
self.running = True
while self.running:
prio, msg = self.inbox.get()
self.receive(msg)
del msg
class CarbonWriter(Actor):
"""Dispatch PublishMetricRequest messages to Carbon.
"""
def __init__(self, host, port):
self.host = host
self.port = port
self.backoff_secs = 0
self.max_backoff_secs = 32
self.sleep_secs = 0
Actor.__init__(self)
self._handlers[PublishMetricRequest] = self.receive_publish
self._sock = socket.socket()
self._connect()
def receive_publish(self, msg):
metric = msg.metric
(path, timestamp, value) = (metric.path, metric.timestamp,
metric.value)
name = '.'.join([self._sanitize_metric_name(x) for x in path])
try:
log.debug("Sending metric to Carbon: %s %s %s" %
(name, timestamp, value))
pickle_list = [(name, (timestamp, value))]
payload = pickle.dumps(pickle_list)
header = struct.pack("!L", len(payload))
message = header + payload
self._sock.sendall(message)
gevent.sleep(self.sleep_secs)
except socket.error, ex:
# Attempt to reconnect, then re-queue the unsent metric
log.warn("Couldn't send to %s:%s: %s" %
(self.host, self.port, ex))
self._connect()
self.send(PublishMetricRequest(metric), priority=49)
def _connect(self):
"""Connect to the Carbon server.
Attempt to connect to the Carbon server. If the connection
attempt fails, increase the backoff time and sleep the writer
greenlet until the backoff time has elapsed.
"""
gevent.sleep(self.backoff_secs)
try:
log.info("Connecting to Carbon instance at %s:%s" %
(self.host, self.port))
self._sock.connect((self.host, self.port))
log.info("Connected to Carbon successfully")
self._reset_backoff()
except socket.error, ex:
log.warn("Failed to connect to %s:%s" %
(self.host, self.port))
self._increase_backoff()
log.warn("Reconnecting in %s seconds" % self.backoff_secs)
def _increase_backoff(self):
if self.backoff_secs == 0:
self.backoff_secs = 1
elif self.backoff_secs < self.max_backoff_secs:
self.backoff_secs *= 2
def _reset_backoff(self):
self.backoff_secs = 0
def _sanitize_metric_name(self, s):
return re.sub("[^\w-]", metric_replacement_char, s)
class FileProcessor(Actor):
"""Parse files and dispatch events when lines found.
"""
def __init__(self):
Actor.__init__(self)
self._handlers[ParseFileRequest] = self.receive_parse
@event
def on_line_found(self, line):
"""Called when a line is parsed from the file.
"""
def receive_parse(self, message):
"""Handle received ParseFileRequest messages.
"""
path = message.path
log.debug("Received file parse request: " + path)
source = PurgedFileFactory.create(path)
if source:
log.debug("Accepted file parse request: " + path)
with open(path, "r") as fp:
for line in fp:
sstr = SourcedString(line.rstrip(os.linesep),
source)
self.on_line_found(sstr)
gevent.sleep(0)
else:
log.debug("Received request to parse {0}, but file is already known".format(path))
class LineProcessor(Actor):
"""Process lines of check results.
"""
def __init__(self):
Actor.__init__(self)
self._handlers[ParseLineRequest] = self.receive_line
self.tokenizer_re = \
r"([^\s]+|'[^']+')=([-.\d]+)(c|s|us|ms|B|KB|MB|GB|TB|%)?(?:;([-.\d]+))?(?:;([-.\d]+))?(?:;([-.\d]+))?(?:;([-.\d]+))?"
@event
def on_metric_found(self, metric):
"""Called when a metric is extracted by the line processor.
"""
@event
def on_parse_failed(self, line):
"""Called when the line processor fails to parse a line.
"""
def receive_line(self, message):
line = message.line.string_
source = message.line.source
fields = self._extract_fields(line)
if not self._fields_valid(fields):
return self.on_parse_failed(line)
for metric in self._make_metrics(fields, source):
self.on_metric_found(metric)
gevent.sleep(0)
def _extract_fields(self, line):
"""Parse KEY::VALUE pairs from a line of performance data.
"""
acc = {}
field_tokens = line.split("\t")
for field_token in field_tokens:
kv_tokens = field_token.split('::')
if len(kv_tokens) == 2:
(key, value) = kv_tokens
acc[key] = value
return acc
def _fields_valid(self, d):
"""Verify that all necessary fields are present.
"""
generic_fields = ['DATATYPE', 'HOSTNAME', 'TIMET']
host_fields = ['HOSTPERFDATA']
service_fields = ['SERVICEDESC', 'SERVICEPERFDATA']
if 'DATATYPE' not in d:
return False
datatype = d['DATATYPE']
if datatype == 'HOSTPERFDATA':
fields = generic_fields + host_fields
elif datatype == 'SERVICEPERFDATA':
fields = generic_fields + service_fields
else:
return False
for field in fields:
if field not in d:
return False
return True
def _make_metrics(self, fields, source):
metric_path_base = []
graphite_prefix = fields.get('GRAPHITEPREFIX')
graphite_postfix = fields.get('GRAPHITEPOSTFIX')
if metric_prefix:
metric_path_base.append(metric_prefix)
hostname = fields['HOSTNAME'].lower()
metric_path_base.append(hostname)
datatype = fields['DATATYPE']
if datatype == 'HOSTPERFDATA':
metric_path_base.append('host')
elif datatype == 'SERVICEPERFDATA':
service_desc = fields.get('SERVICEDESC')
graphite_postfix = fields.get('GRAPHITEPOSTFIX')
if graphite_postfix is not None:
metric_path_base.append(graphite_postfix)
else:
metric_path_base.append(service_desc)
timestamp = fields['TIMET']
perfdata = fields[datatype]
counters = self._parse_perfdata(perfdata)
for (counter, value) in counters:
metric_path = metric_path_base + [counter]
yield Metric(metric_path, timestamp, value, source)
def _parse_perfdata(self, s):
"""Parse performance data from a *PERFDATA string.
"""
metrics = []
counters = re.findall(self.tokenizer_re, s)
if counters is None:
log.warning("Failed to parse performance data: %s" % (s,))
return metrics
for (key, value, uom, warn, crit, min, max) in counters:
try:
metrics.append((key, float(value)))
except ValueError, ex:
log.warning("Couldn't convert value '%s' to float" % (value,))
return metrics
class SpoolRunner(Greenlet):
def __init__(self, perfdata_dir, poll_interval=None):
self.perfdata_dir = perfdata_dir
self.poll_interval = poll_interval
Greenlet.__init__(self)
@event
def on_find(self):
"""Called when a file is found by the spool runner.
"""
def _run(self):
while True:
for filename in os.listdir(self.perfdata_dir):
self.on_find('/'.join([self.perfdata_dir, filename]))
if self.poll_interval is not None:
gevent.sleep(self.poll_interval)
else:
break
def parse_options():
parser = OptionParser()
parser.add_option('-d', '--debug',
help='Do not daemonize; enable debug-level logging',
dest='debug', action='store_true')
parser.add_option('-D', '--spool-dir',
help='Path to performance data spool dir',
dest='spool_dir')
parser.add_option('-H', '--host',
help='Host to submit metrics to',
dest='host')
parser.add_option('-p', '--port',
help='Port to submit metrics to',
dest='port')
parser.add_option('-P', '--prefix',
help='Prefix to prepend to all metric names',
dest='prefix')
parser.add_option('-r', '--replacement-char',
help='Replacement character for illegal metric characters (e.g. ".")',
dest='replacement_char')
(opts, args) = parser.parse_args()
return (opts, args)
#
# Parse options
#
host = None
port = 2004
metric_prefix = None
metric_replacement_char = '_'
perfdata_spool_dir = '/var/spool/metricinga'
pidfile = '/var/run/metricinga.pid'
daemonize = True
log_level = logging.INFO
(opts, args) = parse_options()
if opts.host is not None:
host = opts.host
if opts.port is not None:
port = opts.port
if opts.prefix is not None:
metric_prefix = opts.prefix
if opts.replacement_char is not None:
metric_replacement_char = opts.replacement_char
if opts.spool_dir is not None:
perfdata_spool_dir = opts.spool_dir
if opts.debug is True:
log_level = logging.DEBUG
daemonize = False
if host is None:
print("Fatal: No Graphite host specified!")
sys.exit(1)
if daemonize:
log_handler = logging.handlers.SysLogHandler('/dev/log')
formatter = logging.Formatter(
"%(filename)s: %(levelname)s %(message)s")
else:
log_handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s %(filename)s: %(levelname)s %(message)s",
"%Y/%m/%d %H:%M:%S")
log_handler.setFormatter(formatter)
log = logging.getLogger('log')
log.addHandler(log_handler)
log.setLevel(log_level)
log.info("Starting up...")
# Init workers and run
def run():
# FIXME: Don't need int() once migrated from optparse to argparse
cw = CarbonWriter(opts.host, int(opts.port))
lp = LineProcessor()
lp.on_metric_found.subscribe(lambda metric: cw.send(PublishMetricRequest(metric)))
fp = FileProcessor()
fp.on_line_found.subscribe(lambda line: lp.send(ParseLineRequest(line)))
sp = SpoolRunner(perfdata_spool_dir, 60)
sp.on_find.subscribe(lambda path: fp.send(ParseFileRequest(path)))
actors = [cw, lp, fp]
tasklets = [sp]
workers = actors + tasklets
def shutdown(actors, tasklets):
log.info("Received shutdown signal")
for actor in actors:
actor.send(ShutdownRequest(), priority=0)
for tasklet in tasklets:
tasklet.kill()
gevent.signal(signal.SIGINT, shutdown, actors, tasklets)
gevent.signal(signal.SIGTERM, shutdown, actors, tasklets)
for worker in workers:
worker.start()
gevent.joinall(workers)
if daemonize:
context = DaemonContext()
context.stderr = sys.stderr
context.stdout = sys.stdout
#context.signal_map = {signal.SIGTERM: shutdown, signal.SIGINT: shutdown}
with context:
run()
else:
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment