-
-
Save rda0/e66d56235c5d3e7d467c33431cf667df to your computer and use it in GitHub Desktop.
A subset of ntqp in python based on ntp python lib, modified for python3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# pragma pylint: disable=bad-whitespace | |
############################################################################### | |
# ntpq.py - Python NTP control library. | |
# Copyright (C) 2017 Sven Mäder (@rda0 on github) | |
# Copyright (C) 2016 Peter C. Norton (@pcn on github) | |
# | |
# this addition to ntplib is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU Lesser General Public License as published by the | |
# Free Software Foundation; either version 2 of the License, or (at your option) | |
# any later version. | |
# | |
# This program is distributed in the hope that it will be useful, but WITHOUT | |
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS | |
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more | |
# details. | |
# | |
# You should have received a copy of the GNU Lesser General Public License along | |
# with this program; if not, write to the Free Software Foundation, Inc., 59 | |
# Temple Place, Suite 330, Boston, MA 0.1.2-1307 USA | |
############################################################################### | |
"""Python NTP control client implementation | |
This emulates the action fo the ntpq library, specifically two of the | |
read commands, "associations" and "peers". | |
""" | |
import socket | |
import struct | |
import time | |
import sys | |
import ntplib | |
NTP_CONTROL_PACKET_FORMAT = "!B B H H H H H" #Maybe, maybe not | |
"""packet format to pack/unpack""" | |
NTP_CONTROL_OPCODES = { | |
"readstat" : 1, | |
"readvar" : 2 | |
} | |
# From the ntp distributions ntpq/ntp_control.h | |
#define CTL_PST_SEL_REJECT 0 /* reject */ | |
#define CTL_PST_SEL_SANE 1 /* x falsetick */ | |
#define CTL_PST_SEL_CORRECT 2 /* . excess */ | |
#define CTL_PST_SEL_SELCAND 3 /* - outlier */ | |
#define CTL_PST_SEL_SYNCCAND 4 /* + candidate */ | |
#define CTL_PST_SEL_EXCESS 5 /* # backup */ | |
#define CTL_PST_SEL_SYSPEER 6 /* * sys.peer */ | |
#define CTL_PST_SEL_PPS 7 /* o pps.peer */ | |
NTP_PEER_SELECTION = { | |
# This is the "peer_selection" in data returned from a readvar request | |
# | |
"reject" : 0, | |
"sane" : 1, | |
"correct" : 2, | |
"selcand" : 3, | |
"synccand" : 4, | |
"excess" : 5, | |
"sys.peer" : 6, | |
"pps.peer" : 7 | |
} | |
# packet format to pack/unpack the control header | |
CONTROL_PACKET_FORMAT = "!B B H H H H H" | |
class NTPException(Exception): | |
"""Exception raised by this module.""" | |
pass | |
def composite_assoc_and_peer(host="127.0.0.1"): | |
""" | |
returns a list of associations from the host, | |
combined with the peer data. | |
This data is a mixture of the data that is gotten | |
from the commands 'ntpq -c pe' and 'ntpq -c as' | |
This was my main goal in writing this. | |
""" | |
# ncc = NTPControlClient() | |
# ncp = ncc.request(host, op="readstat") | |
data = list() | |
ncp_data = ntp_control_request(host, op="readstat") | |
for assoc in ncp_data['associations']: | |
readvar_data = ntp_control_request( | |
host, op="readvar", association_id=assoc['association_id']) | |
for key, val in list(assoc.items()): | |
readvar_data[key] = val | |
data.append(readvar_data) | |
return data | |
def decode_association(data): | |
""" | |
Provided a 2 uchar of data, unpack the first uchar of associationID, | |
and the second uchar of association data from that uchar | |
test with e.g. data set to: | |
In [161]: struct.pack("!B B", 0b00010100,0b00011010) | |
Out[161]: '\x14\x1a' | |
This is the data for a single association. | |
""" | |
unpacked = struct.unpack("!H B B", data) | |
return { | |
'association_id' : unpacked[0], | |
'peer_config' : unpacked[1] >> 7 & 0x1, | |
'peer_authenable' : unpacked[1] >> 6 & 0x1, | |
'peer_authentic' : unpacked[1] >> 5 & 0x1, | |
'peer_reach' : unpacked[1] >> 4 & 0x1, | |
'reserved' : unpacked[1] >> 3 & 0x1, | |
'peer_selection' : unpacked[1] & 0x7, | |
'peer_event_counter' : unpacked[2] >> 4 & 0xf, | |
'peer_event_code' : unpacked[2] & 0xf | |
} | |
def control_data_payload(version=2, op='readstat', association_id=0, sequence=1): | |
"""Convert the requested arguments into a buffer that can be sent over a socket. | |
to an ntp server. | |
Returns: | |
buffer representing this packet | |
Raises: | |
NTPException -- in case of invalid field | |
""" | |
leap = 0 # leap second indicator | |
version = version # protocol version | |
mode = 6 # mode 6 is the control mode | |
response_bit = 0 # request | |
error_bit = 0 | |
more_bit = 0 | |
opcode = NTP_CONTROL_OPCODES[op] | |
sequence = sequence | |
status = 0 | |
association_id = association_id | |
offset = 0 | |
count = 0 | |
try: | |
packed = struct.pack( | |
NTP_CONTROL_PACKET_FORMAT, | |
(leap << 6 | version << 3 | mode), | |
(response_bit << 7 | error_bit << 6 | more_bit << 5 | opcode), | |
sequence, | |
status, | |
association_id, | |
offset, | |
count) | |
return packed | |
except struct.error: | |
raise NTPException("Invalid NTP packet fields.") | |
def ntp_control_request(host, version=2, port='ntp', # pylint: disable=too-many-arguments,invalid-name | |
op="readvar", association_id=0, timeout=5): | |
"""Query a NTP server. | |
Parameters: | |
host -- server name/address | |
version -- NTP version to use | |
port -- server port | |
timeout -- timeout on socket operations | |
Returns: | |
dictionary with ntp control info. Specific data will vary based on the request type. | |
""" | |
# lookup server address | |
addrinfo = socket.getaddrinfo(host, port)[0] | |
family, sockaddr = addrinfo[0], addrinfo[4] | |
# create the socket | |
sock = socket.socket(family, socket.SOCK_DGRAM) | |
try: | |
sock.settimeout(timeout) | |
# create a control request packet | |
sock.sendto( | |
control_data_payload( | |
op=op, version=version, | |
association_id=association_id), | |
sockaddr) | |
# wait for the response - check the source address | |
src_addr = None, | |
while src_addr[0] != sockaddr[0]: | |
response_packet, src_addr = sock.recvfrom(512) | |
# build the destination timestamp | |
# dest_timestamp = ntplib.system_to_ntp_time(time.time()) | |
except socket.timeout: | |
raise NTPException("No response received from %s." % host) | |
finally: | |
sock.close() | |
packet_dict = control_packet_from_data(response_packet) | |
return packet_dict | |
def control_packet_from_data(data): | |
"""Populate this instance from a NTP packet payload received from | |
the network. | |
Parameters: | |
data -- buffer payload | |
Returns: | |
dictionary of control packet data. | |
Raises: | |
NTPException -- in case of invalid packet format | |
""" | |
def decode_readstat(header_len, data, rdata): | |
""" | |
Decodes a readstat request. Augments rdata with | |
association IDs from data | |
""" | |
rdata['associations'] = list() | |
for offset in range(header_len, len(data), 4): | |
assoc = data[offset:offset+4] | |
association_dict = decode_association(assoc) | |
rdata['associations'].append(association_dict) | |
return rdata | |
def decode_readvar(header_len, data, rdata): | |
""" | |
Decodes a redvar request. Augments rdata dictionary with | |
the textual data int he data packet. | |
""" | |
def to_time(integ, frac, n=32): # pylint: disable=invalid-name | |
"""Return a timestamp from an integral and fractional part. | |
Having this here eliminates using an function internal to | |
ntplib. | |
Parameters: | |
integ -- integral part | |
frac -- fractional part | |
n -- number of bits of the fractional part | |
Retuns: | |
float seconds since the epoch/ aka a timestmap | |
""" | |
return integ + float(frac)/2**n | |
data = str(data).replace("\\r\\n"," ") | |
buf = data[header_len:].split(",") | |
for field in buf: | |
try: | |
key, val = field.replace("\r\n", "").lstrip().split("=") | |
except ValueError as ve: | |
sys.stderr.write("Got {} trying to unpack {}\n".format(ve, field)) | |
sys.stderr.write("as part of {}\n".format(buf)) | |
continue | |
if key in ('rec', 'reftime'): | |
int_part, frac_part = [ int(x, 16) for x in val.split(".") ] | |
rdata[key] = ntplib.ntp_to_system_time( | |
to_time(int_part, frac_part)) # pylint: disable=protected-access | |
else: | |
rdata[key] = val | |
# For the equivalent of the 'when' column, in ntpq -c pe | |
# I believe that the time.time() minus the 'rec' matches that value. | |
# Causes exception if associd = 0 | |
#rdata['when'] = time.time() - rdata['rec'] | |
return rdata | |
try: | |
header_len = struct.calcsize(CONTROL_PACKET_FORMAT) | |
unpacked = struct.unpack(CONTROL_PACKET_FORMAT, data[0:header_len]) | |
except struct.error: | |
raise NTPException("Invalid NTP packet.") | |
# header status | |
rdata = { | |
"leap_header" : unpacked[0] >> 6 & 0x1, | |
"version" : unpacked[0] >> 3 & 0x7, | |
"mode" : unpacked[0] & 0x7, # end first uchar | |
"response_bit" : unpacked[1] >> 7 & 0x1, | |
"error_bit" : unpacked[1] >> 6 & 0x1, | |
"more_bit" : unpacked[1] >> 5 & 0x1, | |
"opcode" : unpacked[1] & 0x1f, # end second uchar | |
"sequence" : unpacked[2], | |
"leap" : unpacked[3] >> 14 & 0x1, | |
"clocksource" : unpacked[3] >> 8 & 0x1f, # 6 bit mask | |
"system_event_counter" : unpacked[3] >> 4 & 0xf, | |
"system_event_code" : unpacked[3] & 0xf, # End first ushort | |
"association_id" : unpacked[4], | |
"offset" : unpacked[5], | |
"count" : unpacked[6] | |
} | |
opcodes_by_number = { v:k for k, v in list(NTP_CONTROL_OPCODES.items()) } | |
if opcodes_by_number[rdata['opcode']] == "readstat": | |
return decode_readstat(header_len, data, rdata) | |
elif opcodes_by_number[rdata['opcode']] == "readvar": | |
return decode_readvar(header_len, data, rdata) | |
## Example usage | |
# | |
# print(composite_assoc_and_peer()) | |
# | |
# host = '127.0.0.1' | |
# data = list() | |
# | |
# ncp_data = ntp_control_request(host, op="readstat") | |
# | |
# systemvars = ntp_control_request(host, op="readvar") | |
# print('') | |
# print(systemvars) | |
# | |
# for assoc in ncp_data['associations']: | |
# readvar_data = ntp_control_request( | |
# host, op="readvar", association_id=assoc['association_id']) | |
# for key, val in list(assoc.items()): | |
# readvar_data[key] = val | |
# data.append(readvar_data) | |
# | |
# for peer in data: | |
# print('') | |
# print(peer) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment