Skip to content

Instantly share code, notes, and snippets.

@creisor
Created November 3, 2021 15:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save creisor/0ba67406222197593126cb00c32eef5b to your computer and use it in GitHub Desktop.
Save creisor/0ba67406222197593126cb00c32eef5b to your computer and use it in GitHub Desktop.
Nagios check for journalctl
#!/usr/bin/env python3
"""A Nagios check for entries with a certain log level in journalctl.
For example, you might want to alert if there have been 5 logs with level 'error' in the journal in the past 5 minutes:
check_journalctl -u my-cool-app --since 5m --log-level error --warning 2.0 --critical 5.0
"""
import sys
import re
import argparse
import datetime
import subprocess
import json
import logging
def parse_args():
"""Parses the commandline arguments"""
parser = argparse.ArgumentParser(description='Check for a loglevel in syslog for a unit.')
parser.add_argument('--unit-name', '-u', type=str, nargs='?', dest='unit_name', required=True,
help='the systemd unit name of the program whose log is being checked (see the -u flag for journalctl)')
parser.add_argument('--since', '-s', nargs='?', dest='since', required=True, type=since_abbreviation,
help='how much time in seconds (s), minutes (m), or hours (h) you want to search backwards, (e.g.: "5m" or "600s")')
parser.add_argument('--log-level', '-l', type=str, nargs='?', dest='level',
help='the loglevel to search for', default='error')
parser.add_argument('--warning', '-w', type=float, nargs='?', dest='warning', default=1.0,
help='warning threshold count of entries of "log-level" type (e.g.: if --log-level is error, N errors)')
parser.add_argument('--critical', '-c', type=float, nargs='?', dest='critical', default=2.0,
help='critical threshold count of entries of "log-level" type (e.g.: if --log-level is error, N errors)')
parser.add_argument('--verbose', '-V', help='Verbose logging', action='store_true')
return parser.parse_args()
def since_abbreviation(since):
"""Takes a string and returns a string representing now() minus that time, or raises ArgumentTypeError"""
match = re.search(r'^(?P<number>\d+)(?P<period>[smh])$', since)
err_msg = 'since abbreviation should be in the form N[smh], e.g.: 600s, 5m, 1h'
if not match:
raise argparse.ArgumentTypeError(err_msg)
if match.group('period') not in ['s', 'm', 'h']:
raise argparse.ArgumentTypeError(err_msg)
now = datetime.datetime.today()
periods = {'s': 0, 'm': 0, 'h': 0}
periods[match.group('period')] = int(match.group('number'))
delta = datetime.timedelta(days=0, seconds=periods['s'], microseconds=0,
milliseconds=0, minutes=periods['m'], hours=periods['h'], weeks=0)
# from man journalctl:
# Date specifications should be of the format "2012-10-30 18:17:16". If the time part is omitted, "00:00:00" is assumed.
return (now - delta).strftime("%Y-%m-%d %H:%M:%S")
class Monitor:
states = {
'ok': {'text': 'OK', 'code': 0},
'warning': {'text': 'WARNING', 'code': 1},
'critical': {'text': 'CRITICAL', 'code': 2},
'unknown': {'text': 'UNKNOWN', 'code': 3},
}
def __init__(self, unit_name, since, level, warning, critical):
self.unit_name = unit_name
self.since = since
self.level = level
self.warning = warning
self.critical = critical
def check(self):
"""Get the journal logs, evaluate the log entries based on the thresholds, print Nagios message to stdout, and return"""
logs = self.__get_journal_logs()
self.messages = [l['msg'] for l in logs if l['level'] == self.level]
return self.__eval()
def __eval(self):
logging.debug("Evaluating with warning threshold of '{}' and critical threshold of '{}'".format(self.warning, self.critical))
metric = len(self.messages)
logging.debug("{} {} messages: {}".format(metric, self.level, '"{0}"'.format('", "'.join(self.messages))))
if metric > self.critical:
return self.__status(self.states['critical'], metric)
if metric > self.warning:
return self.__status(self.states['warning'], metric)
return self.__status(self.states['ok'], metric)
def __status(self, state, metric):
output_template = "JOURNALCTL {} - {} messages at log level '{}'"
print(output_template.format(state['text'], metric, self.level))
return state['code']
def __get_journal_logs(self):
"""returns parsed json MESSAGE field of the journald logs"""
cmd = ['journalctl', '-u', self.unit_name, '-o', 'json', '--since', '"{}"'.format(self.since)]
logging.debug("Running cmd: {}".format(' '.join(cmd)))
stdout = subprocess.check_output(' '.join(cmd), stderr=subprocess.STDOUT, shell=True)
logs = []
for logline in stdout.split(b"\n"):
try:
logs.append(json.loads(logline.decode('utf-8')))
except json.decoder.JSONDecodeError:
continue
inner_logs = []
messages = [l['MESSAGE'] for l in logs]
for msg in messages:
try:
inner_logs.append(json.loads(msg))
except json.decoder.JSONDecodeError:
continue
return inner_logs
if __name__ == "__main__":
args = parse_args()
log_format = '%(asctime)-15s - %(message)s'
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=log_level, format=log_format)
logging.debug("Checking logs since {}".format(args.since))
mon = Monitor(args.unit_name, args.since, args.level, args.warning, args.critical)
sys.exit(mon.check())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment