Skip to content

Instantly share code, notes, and snippets.

@lukastribus
Created December 17, 2020 16:16
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukastribus/695c9e780d118755271519d4d3cb54f3 to your computer and use it in GitHub Desktop.
Save lukastribus/695c9e780d118755271519d4d3cb54f3 to your computer and use it in GitHub Desktop.
xrrtrcheck
#!/usr/bin/env python3
#
import os
import sys
import argparse
import xmltodict
from ncclient import manager
XRRTRSTATUS_DEFAULT_V4_VRPS_MIN = 150000
XRRTRSTATUS_DEFAULT_V6_VRPS_MIN = 20000
XRRTRSTATUS_DEFAULT_V4_MAX_DEV = 30000
XRRTRSTATUS_DEFAULT_V6_MAX_DEV = 10000
def getRpkiServerList(host, port, username, password, key_verify = True):
# Create an XML filter for targeted NETCONF queries
netconf_filter = """
<filter>
<bgp xmlns="http://cisco.com/ns/yang/Cisco-IOS-XR-ipv4-bgp-oper">
<instances>
<instance>
<instance-active>
<rpki-server-list>
</rpki-server-list>
</instance-active>
</instance>
</instances>
</bgp>
</filter>"""
# connect to NETCONF endpoint
m=manager.connect(host=host,port=port,username=username,password=password,hostkey_verify=key_verify)
# issue get query with filter
netconf_reply = m.get(filter = netconf_filter)
# parse the returned XML
netconf_data = xmltodict.parse(netconf_reply.xml)["rpc-reply"]["data"]
# extract rpki-server-list
rpkilist = netconf_data['bgp']['instances']['instance']['instance-active']['rpki-server-list']['rpki-server']
# single RPKI server, make a list anyway
if not isinstance(rpkilist, list):
rpkilist = (rpkilist,)
return rpkilist
def rpkiServerListCheck (rpkilist, ipv4_vrps_min=XRRTRSTATUS_DEFAULT_V4_VRPS_MIN,
ipv6_vrps_min=XRRTRSTATUS_DEFAULT_V6_VRPS_MIN, ipv4_vrps_maxdev=XRRTRSTATUS_DEFAULT_V4_MAX_DEV,
ipv6_vrps_maxdev=XRRTRSTATUS_DEFAULT_V6_MAX_DEV):
last_v4_vrps=None
last_v6_vrps=None
verbose_output=''
error_output=''
for srv in rpkilist:
cur_v4_vrps=int(srv['ipv4roa'])
cur_v6_vrps=int(srv['ipv6roa'])
cur_name=srv['name']
if not srv['state'] == 'connected':
error_output += "%s state is %s!"%(cur_name, srv['state'])+"\n"
elif cur_v4_vrps < ipv4_vrps_min:
error_output += "%s IPv4 VRPs %s lower than expected %s!"%(cur_name, str(cur_v4_vrps), str(ipv4_vrps_min))+"\n"
elif cur_v6_vrps < ipv6_vrps_min:
error_output += "%s IPv6 VRPs %s lower than expected %s!"%(cur_name, str(cur_v6_vrps), str(ipv6_vrps_min))+"\n"
elif last_v4_vrps and ipv4_vrps_maxdev and abs(last_v4_vrps - cur_v4_vrps) > ipv4_vrps_maxdev:
error_output += "%s IPv4 VRPs %s deviates more than %s from %s with %s IPv4 VRPs!"%(cur_name, str(cur_v4_vrps), str(ipv4_vrps_maxdev), last_name, str(last_v4_vrps))+"\n"
elif last_v6_vrps and ipv6_vrps_maxdev and abs(last_v6_vrps - cur_v6_vrps) > ipv6_vrps_maxdev:
error_output += "%s IPv6 VRPs %s deviates more than %s from %s with %s IPv6 VRPs!"%(cur_name, str(cur_v6_vrps), str(ipv6_vrps_maxdev), last_name, str(last_v6_vrps))+"\n"
else:
verbose_output += ("%s is healthy (IPv4/6 VRPs: %s/%s)." % (cur_name, str(cur_v4_vrps), str(cur_v6_vrps)))+"\n"
last_name=cur_name
last_v4_vrps=cur_v4_vrps
last_v6_vrps=cur_v6_vrps
return error_output, verbose_output
def nagios_exit(error_output, verbose, verbose_output):
if error_output:
print(error_output, file=sys.stderr, end='')
exit_code=2
else:
exit_code=0
if verbose:
print (verbose_output, end='')
sys.exit(exit_code)
def main():
parser = argparse.ArgumentParser(description='monitor RPKI/RTR servers status as seen by an IOS-XR device via NETCONF',
epilog='v0.1, Copyright (c) 2020, Lukas Tribus <lukas@ltri.eu>')
parser.add_argument('host', help='hostname or IP')
parser.add_argument('--port', '-p', dest='port', help='port (default: 830)', type=int, default=830)
parser.add_argument('--insecure', '-i', dest='key_verify', help='disable SSH key verification', action='store_false')
parser.add_argument('--username', '-U', dest='username', help='NETCONF SSH username (or environment variable XRRTRSTATUS_SSH_USER)', default=None)
parser.add_argument('--password', '-P', dest='password', help='NETCONF SSH password (or environment variable XRRTRSTATUS_SSH_PW)', default=None)
parser.add_argument('--ipv4-vrps-min', '-4m', dest='ipv4_vrps_min', help='minimum number of absolute IPv4 VRPs, zero to disable (default %s)' % str(XRRTRSTATUS_DEFAULT_V4_VRPS_MIN), type=int, default=XRRTRSTATUS_DEFAULT_V4_VRPS_MIN)
parser.add_argument('--ipv6-vrps-min', '-6m', dest='ipv6_vrps_min', help='minimum number of absolute IPv6 VRPs, zero to disable (default %s)' % str(XRRTRSTATUS_DEFAULT_V6_VRPS_MIN), type=int, default=XRRTRSTATUS_DEFAULT_V6_VRPS_MIN)
parser.add_argument('--ipv4-vrps-maxdev', '-4d', dest='ipv4_vrps_maxdev', help='maximum deviation of IPv4 VRPs between RTR servers, zero to disable (default (default %s)' % str(XRRTRSTATUS_DEFAULT_V4_MAX_DEV), type=int, default=XRRTRSTATUS_DEFAULT_V4_MAX_DEV)
parser.add_argument('--ipv6-vrps-maxdev', '-6d', dest='ipv6_vrps_maxdev', help='maximum deviation of IPv6 VRPs between RTR servers, zero to disable (default (default %s)' % str(XRRTRSTATUS_DEFAULT_V6_MAX_DEV), type=int, default=XRRTRSTATUS_DEFAULT_V6_MAX_DEV)
parser.add_argument('--verbose', '-v', dest='verbose', help='verbose output', action='store_true')
parser.set_defaults(key_verify=True, verbose=False)
args = vars(parser.parse_args())
host = args['host']
port = args['port']
key_verify = args['key_verify']
username = args['username'] if args['username'] else os.getenv('XRRTRSTATUS_SSH_USER')
password = args['password'] if args['password'] else os.getenv('XRRTRSTATUS_SSH_PW')
ipv4_vrps_min = args['ipv4_vrps_min']
ipv6_vrps_min = args['ipv6_vrps_min']
ipv4_vrps_maxdev = args['ipv4_vrps_maxdev']
ipv6_vrps_maxdev = args['ipv6_vrps_maxdev']
verbose = args['verbose']
rpkiserverlist = getRpkiServerList(host, port, username, password, key_verify)
error_output, verbose_output = rpkiServerListCheck(rpkiserverlist, ipv4_vrps_min,
ipv6_vrps_min, ipv4_vrps_maxdev, ipv6_vrps_maxdev)
nagios_exit(error_output, verbose, verbose_output)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment