Skip to content

Instantly share code, notes, and snippets.

@cidrblock
Created November 21, 2017 19:16
Show Gist options
  • Save cidrblock/04c26846ef1aeebab1aa3ad9e1a817b5 to your computer and use it in GitHub Desktop.
Save cidrblock/04c26846ef1aeebab1aa3ad9e1a817b5 to your computer and use it in GitHub Desktop.
Check the health of your ntp servers
""" ntpq out parser and reporter
"""
import re
import socket
# pylint: disable=redefined-builtin
from ansible.module_utils.basic import AnsibleModule
# - name: Run the ntp health module against the out from the ntp status command
# ntp_health:
# output: "{{ output['stdout'][0] }}" # the output of 'show ntp associations or show ntp peer-status'
# ntp_servers: "{{ ntp_servers }}" # a list of ntp servers
# os: "{{ os }}" # netmiko style os (cisco_os, cisco_nxos, cisco_xe etc)
# domain_name: "company.net" # the domain name to add to ntp servers where their domain gets cut off
# register: health
# ignore_errors: true
#
# - debug: var=health
REGEXES = {
"arista_eos": re.compile(r'''
^ # Beginning of line
(?P<status>[\sx\.\-+#\*o]) # Capture the status
(?P<remote>\S+)\s+ # The remote name
(?P<refid>(\d{1,3}.){3}\d{1,3})\s+ # The remote's refid followed by spaces
(?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
(?P<type>[lumb-])\s+ # The type of remote, followed by spaces
(?P<when>\d+)\s+ # The last time the server was queried
(?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
(?P<reach>\d+)\s+ # Reach, followed by spaces
(?P<delay>[\d\.]*)\s+ # Delay, followed by spaces
(?P<offset>[\d\.-]*)\s+ # Offset followed by spaces
(?P<jitter>[\d\.]*) # Jitter
$ ''', # End of line
re.VERBOSE),
"cisco_ios": re.compile(r'''
^ # Beginning of line
(?P<status>[\sx\.\-+#\*o]) # Capture the status
(?P<configured>\~) # Configured
(?P<remote>(\d{1,3}.){3}\d{1,3})\s+ # The remote IP followed by spaces
(?P<refid>(\d{1,3}.){3}\d{1,3})\s+ # The remote's refid followed by spaces
(?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
(?P<when>\d+)\s+ # The last time the server was queried
(?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
(?P<reach>\d+)\s+ # Reach, followed by spaces
(?P<delay>[\d\.]*)\s+ # Delay, followed by spaces
(?P<offset>[\d\.-]*)\s+ # Offset followed by spaces
(?P<jitter>[\d\.]*) # Jitter
$''', # End of line
re.VERBOSE),
"cisco_nxos": re.compile(r'''
^
(?P<status>[\*\+=-]) # Beginning of line
(?P<remote>(\d{1,3}.){3}\d{1,3})\s+ # The remote IP followed by spaces
(?P<local>(\d{1,3}.){3}\d{1,3})\s+ # The local IP followed by spaces
(?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
(?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
(?P<reach>\d+)\s+ # Delay, followed by spaces
(?P<delay>[\d\.]*) # Reach
(\s+(?P<vrf>\S+))? # spaces, VRF (optional)
(\s+)? # stoopid eol spaces
$''', # End of line
re.VERBOSE),
"cisco_xr": re.compile(r'''
^ # Beginning of line
(?P<status>[\sx\.\-+#\*o]) # Capture the status
(?P<configured>\~) # Configured
(?P<remote>(\d{1,3}.){3}\d{1,3})\s+ # The remote IP followed by spaces
(vrf\s(?P<vrf>\S+)\s+)? # Optional VRF
(?P<refid>(\d{1,3}.){3}\d{1,3})\s+ # The remote's refid followed by spaces
(?P<stratum>\d{1,2})\s+ # The stratum of the remote followed by spaces
(?P<when>\d+)\s+ # The last time the server was queried
(?P<poll>\d+)\s+ # Frequency of poll, followed by spaces
(?P<reach>\d+)\s+ # Reach, followed by spaces
(?P<delay>[\d\.]*)\s+ # Delay, followed by spaces
(?P<offset>[\d\.-]*)\s+ # Offset followed by spaces
(?P<jitter>[\d\.]*) # Jitter
$''', # End of line
re.VERBOSE),
}
def nxos_vdc(os, output):
""" Determine if this is a VDC and set desired accordingly
Args:
output (str): The output from the ntp status command
Returns:
dict: Mock desired since we cna resolve
"""
if os == "cisco_nxos":
if "System clock is not controlled by NTP in this VDC" in output:
return True
else:
return False
def parse_output(params):
""" Parses the output and returns structured data
Args:
output (str): The output from the ntp status command
Returns:
dict: A dictionary of entries
"""
entries = []
if params['os'] in REGEXES:
for line in params['output'].splitlines():
result = re.match(REGEXES[params['os']], line)
if result:
entry = result.groupdict()
entries.append(entry)
return None, entries
else:
return "No regex support for %s" % params['os'], None
def arista_resolve(entries, domain_name):
""" Resolves entries to an IP address
Args:
entries (dict): The dict of entries
Returns:
dict: A dictionary of entries
"""
for entry in entries:
try:
entry['remote'] = socket.gethostbyname("%s.%s" % (entry['remote'].split('.')[0], domain_name))
except socket.gaierror:
pass
return entries
def xr_unwrap(output):
""" Unwrap long XR entries
Args:
output (str): The str output
Returns:
str: Unwrapped output
"""
lines = output.splitlines()
i = len(lines) - 1
newlist = []
while i >= 0:
hl_regex = re.compile(r'^\s+(\d{1,3}.){3}\d{1,3}')
if hl_regex.match(lines[i]):
newlist.insert(0, "%s %s" % (lines[i-1], lines[i]))
i -= 2
else:
newlist.insert(0, lines[i])
i -= 1
lines = "\n".join(newlist)
return lines
def find_failed(entries):
""" Walk the entries and look for stratum 16
Args:
entries (dict): A dictionary of NTP server entries
Return:
dict: The dict of entries with an additonal k,v for each
"""
for entry in entries:
entry['stratum_ok'] = bool(entry['stratum'] != "16")
return entries
def resolve_expected(entries):
""" Resolve each in the list of entries to an IP address
Args:
entries (list): A list of NTP servers
Returns:
dict: A dict of name, ip
"""
desired = {}
for entry in entries:
try:
desired[entry] = socket.gethostbyname(entry)
except socket.gaierror:
desired[entry] = None
except Exception as error: # pylint: disable=broad-except
desired[entry] = str(error)
return desired
def roll_up(summary):
""" Assess each of the individual health check and produce a final health check
Args:
summary (dict): A dictionary of summary information
Returns:
dict: The same dict, with a health note and health
"""
summary['healthy_failed_reasons'] = []
if summary['missing']:
note = "One or more NTP servers is missing from the output."
summary['healthy_failed_reasons'].append(note)
if summary['extra']:
note = "One or more extra NTP servers found in the output."
summary['healthy_failed_reasons'].append(note)
if not summary['stratums_ok']:
note = "One or more NTP servers found with a stratum of 16."
summary['healthy_failed_reasons'].append(note)
if not summary['desired_ok']:
note = "One or more of the desired NTP servers did not resolve to an IP address."
summary['healthy_failed_reasons'].append(note)
if not summary['desired_no_dupes']:
note = "Duplicate entries found in desired NTP server list."
summary['healthy_failed_reasons'].append(note)
if not bool(summary['chosen']):
note = "No 'chosen' NTP server found in output."
summary['healthy_failed_reasons'].append(note)
if not summary['reachability_ok']:
note = "One or more NTP server is experiencing reachability issues."
summary['healthy_failed_reasons'].append(note)
summary['healthy'] = not bool(summary['healthy_failed_reasons'])
return summary
def summarize(entries, desired, output):
""" Use both the output and parsed entries to produce summary information
Args:
output (str): The output from the ntpq -pn command
entries (dict): A dictionary of NTP server entries
desired (dict): The list of NTP servers, resolved to IPs
Returns:
summary (dict): The summary dictionary
"""
summary = {}
current_ntp_ip_list = set(x['remote'] for x in entries)
desired_ntp_ip_list = set(desired.values())
summary['chosen'] = next((x for x in entries if x['status'] == "*"), None)
summary['current_ntp_ip_list'] = list(current_ntp_ip_list)
summary['desired_no_dupes'] = len(desired_ntp_ip_list) == len(desired.values())
summary['desired_ntp_ip_list'] = list(desired_ntp_ip_list)
summary['desired_ok'] = all([bool(v) for v in desired.values()])
summary['desired'] = desired
summary['entries'] = entries
summary['extra'] = list(current_ntp_ip_list - desired_ntp_ip_list)
summary['missing'] = list(desired_ntp_ip_list - current_ntp_ip_list)
summary['output'] = output.splitlines()
summary['reachability_issues'] = [x for x in entries if x['reach'] != "377"]
summary['reachability_ok'] = not bool(summary['reachability_issues'])
summary['stratums_ok'] = all([x['stratum_ok'] for x in entries])
summary = roll_up(summary)
return summary
def main():
""" Genisis
"""
module = AnsibleModule(
argument_spec=dict(
ntp_servers=dict(required=True, type='list'),
output=dict(required=True, type='str'),
os=dict(required=True, type='str'),
domain_name=dict(required=False, type='str'),
),
supports_check_mode=True)
try:
# Look for VDC warning in NX-OS
is_vdc = nxos_vdc(module.params['os'], module.params['output'])
if is_vdc:
module.exit_json(changed=False, results="Is a NXOS VDC")
else:
desired = resolve_expected(module.params['ntp_servers'])
# Reuse the IOS regex
if module.params['os'] in ["cisco_xe", "cisco_asa", "cisco_fwsm", "cisco_pix"]:
module.params['os'] = "cisco_ios"
# Unwrap XR output
if module.params['os'] == 'cisco_xr':
module.params['output'] = xr_unwrap(module.params['output'])
# Move along
error, entries = parse_output(module.params)
if error:
module.fail_json(msg=error)
# Resolve arista names back to IPs
if module.params['os'] == "arista_eos":
entries = arista_resolve(entries, module.params['domain_name'])
entries = find_failed(entries)
summary = summarize(entries, desired, module.params['output'])
if summary['healthy']:
module.exit_json(changed=False, results=summary)
else:
module.fail_json(msg=(", ".join(summary['healthy_failed_reasons'])), results=summary)
except Exception as error: # pylint: disable=broad-except
error_type = error.__class__.__name__
module.fail_json(msg=error_type + ": " + str(error))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment