xrrtrcheck
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# | |
import os | |
import sys | |
import argparse | |
import xmltodict | |
from ncclient import manager | |
XRRTRSTATUS_DEFAULT_V4_VRPS_MIN = 150000 | |
XRRTRSTATUS_DEFAULT_V6_VRPS_MIN = 20000 | |
XRRTRSTATUS_DEFAULT_V4_MAX_DEV = 30000 | |
XRRTRSTATUS_DEFAULT_V6_MAX_DEV = 10000 | |
def getRpkiServerList(host, port, username, password, key_verify = True): | |
# Create an XML filter for targeted NETCONF queries | |
netconf_filter = """ | |
<filter> | |
<bgp xmlns="http://cisco.com/ns/yang/Cisco-IOS-XR-ipv4-bgp-oper"> | |
<instances> | |
<instance> | |
<instance-active> | |
<rpki-server-list> | |
</rpki-server-list> | |
</instance-active> | |
</instance> | |
</instances> | |
</bgp> | |
</filter>""" | |
# connect to NETCONF endpoint | |
m=manager.connect(host=host,port=port,username=username,password=password,hostkey_verify=key_verify) | |
# issue get query with filter | |
netconf_reply = m.get(filter = netconf_filter) | |
# parse the returned XML | |
netconf_data = xmltodict.parse(netconf_reply.xml)["rpc-reply"]["data"] | |
# extract rpki-server-list | |
rpkilist = netconf_data['bgp']['instances']['instance']['instance-active']['rpki-server-list']['rpki-server'] | |
# single RPKI server, make a list anyway | |
if not isinstance(rpkilist, list): | |
rpkilist = (rpkilist,) | |
return rpkilist | |
def rpkiServerListCheck (rpkilist, ipv4_vrps_min=XRRTRSTATUS_DEFAULT_V4_VRPS_MIN, | |
ipv6_vrps_min=XRRTRSTATUS_DEFAULT_V6_VRPS_MIN, ipv4_vrps_maxdev=XRRTRSTATUS_DEFAULT_V4_MAX_DEV, | |
ipv6_vrps_maxdev=XRRTRSTATUS_DEFAULT_V6_MAX_DEV): | |
last_v4_vrps=None | |
last_v6_vrps=None | |
verbose_output='' | |
error_output='' | |
for srv in rpkilist: | |
cur_v4_vrps=int(srv['ipv4roa']) | |
cur_v6_vrps=int(srv['ipv6roa']) | |
cur_name=srv['name'] | |
if not srv['state'] == 'connected': | |
error_output += "%s state is %s!"%(cur_name, srv['state'])+"\n" | |
elif cur_v4_vrps < ipv4_vrps_min: | |
error_output += "%s IPv4 VRPs %s lower than expected %s!"%(cur_name, str(cur_v4_vrps), str(ipv4_vrps_min))+"\n" | |
elif cur_v6_vrps < ipv6_vrps_min: | |
error_output += "%s IPv6 VRPs %s lower than expected %s!"%(cur_name, str(cur_v6_vrps), str(ipv6_vrps_min))+"\n" | |
elif last_v4_vrps and ipv4_vrps_maxdev and abs(last_v4_vrps - cur_v4_vrps) > ipv4_vrps_maxdev: | |
error_output += "%s IPv4 VRPs %s deviates more than %s from %s with %s IPv4 VRPs!"%(cur_name, str(cur_v4_vrps), str(ipv4_vrps_maxdev), last_name, str(last_v4_vrps))+"\n" | |
elif last_v6_vrps and ipv6_vrps_maxdev and abs(last_v6_vrps - cur_v6_vrps) > ipv6_vrps_maxdev: | |
error_output += "%s IPv6 VRPs %s deviates more than %s from %s with %s IPv6 VRPs!"%(cur_name, str(cur_v6_vrps), str(ipv6_vrps_maxdev), last_name, str(last_v6_vrps))+"\n" | |
else: | |
verbose_output += ("%s is healthy (IPv4/6 VRPs: %s/%s)." % (cur_name, str(cur_v4_vrps), str(cur_v6_vrps)))+"\n" | |
last_name=cur_name | |
last_v4_vrps=cur_v4_vrps | |
last_v6_vrps=cur_v6_vrps | |
return error_output, verbose_output | |
def nagios_exit(error_output, verbose, verbose_output): | |
if error_output: | |
print(error_output, file=sys.stderr, end='') | |
exit_code=2 | |
else: | |
exit_code=0 | |
if verbose: | |
print (verbose_output, end='') | |
sys.exit(exit_code) | |
def main(): | |
parser = argparse.ArgumentParser(description='monitor RPKI/RTR servers status as seen by an IOS-XR device via NETCONF', | |
epilog='v0.1, Copyright (c) 2020, Lukas Tribus <lukas@ltri.eu>') | |
parser.add_argument('host', help='hostname or IP') | |
parser.add_argument('--port', '-p', dest='port', help='port (default: 830)', type=int, default=830) | |
parser.add_argument('--insecure', '-i', dest='key_verify', help='disable SSH key verification', action='store_false') | |
parser.add_argument('--username', '-U', dest='username', help='NETCONF SSH username (or environment variable XRRTRSTATUS_SSH_USER)', default=None) | |
parser.add_argument('--password', '-P', dest='password', help='NETCONF SSH password (or environment variable XRRTRSTATUS_SSH_PW)', default=None) | |
parser.add_argument('--ipv4-vrps-min', '-4m', dest='ipv4_vrps_min', help='minimum number of absolute IPv4 VRPs, zero to disable (default %s)' % str(XRRTRSTATUS_DEFAULT_V4_VRPS_MIN), type=int, default=XRRTRSTATUS_DEFAULT_V4_VRPS_MIN) | |
parser.add_argument('--ipv6-vrps-min', '-6m', dest='ipv6_vrps_min', help='minimum number of absolute IPv6 VRPs, zero to disable (default %s)' % str(XRRTRSTATUS_DEFAULT_V6_VRPS_MIN), type=int, default=XRRTRSTATUS_DEFAULT_V6_VRPS_MIN) | |
parser.add_argument('--ipv4-vrps-maxdev', '-4d', dest='ipv4_vrps_maxdev', help='maximum deviation of IPv4 VRPs between RTR servers, zero to disable (default (default %s)' % str(XRRTRSTATUS_DEFAULT_V4_MAX_DEV), type=int, default=XRRTRSTATUS_DEFAULT_V4_MAX_DEV) | |
parser.add_argument('--ipv6-vrps-maxdev', '-6d', dest='ipv6_vrps_maxdev', help='maximum deviation of IPv6 VRPs between RTR servers, zero to disable (default (default %s)' % str(XRRTRSTATUS_DEFAULT_V6_MAX_DEV), type=int, default=XRRTRSTATUS_DEFAULT_V6_MAX_DEV) | |
parser.add_argument('--verbose', '-v', dest='verbose', help='verbose output', action='store_true') | |
parser.set_defaults(key_verify=True, verbose=False) | |
args = vars(parser.parse_args()) | |
host = args['host'] | |
port = args['port'] | |
key_verify = args['key_verify'] | |
username = args['username'] if args['username'] else os.getenv('XRRTRSTATUS_SSH_USER') | |
password = args['password'] if args['password'] else os.getenv('XRRTRSTATUS_SSH_PW') | |
ipv4_vrps_min = args['ipv4_vrps_min'] | |
ipv6_vrps_min = args['ipv6_vrps_min'] | |
ipv4_vrps_maxdev = args['ipv4_vrps_maxdev'] | |
ipv6_vrps_maxdev = args['ipv6_vrps_maxdev'] | |
verbose = args['verbose'] | |
rpkiserverlist = getRpkiServerList(host, port, username, password, key_verify) | |
error_output, verbose_output = rpkiServerListCheck(rpkiserverlist, ipv4_vrps_min, | |
ipv6_vrps_min, ipv4_vrps_maxdev, ipv6_vrps_maxdev) | |
nagios_exit(error_output, verbose, verbose_output) | |
if __name__ == '__main__': | |
main() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment