Skip to content

Instantly share code, notes, and snippets.

@flaviut
Created November 2, 2023 01:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save flaviut/c038ffb0aaff73defc9c93aa9de47cad to your computer and use it in GitHub Desktop.
Save flaviut/c038ffb0aaff73defc9c93aa9de47cad to your computer and use it in GitHub Desktop.
A simple, experimental NTP client written in Python.
import socket
import struct
import time
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
NTP_SERVER = "pool.ntp.org"
NTP_PORT = 123
TIME1970 = 2208988800 # 1970-01-01 in NTP epoch
class LeapIndicator(IntEnum):
NO_WARNING = 0
LAST_MINUTE_61 = 1
LAST_MINUTE_59 = 2
ALARM_CONDITION = 3
class Mode(IntEnum):
RESERVED = 0
SYMMETRIC_ACTIVE = 1
SYMMETRIC_PASSIVE = 2
CLIENT = 3
SERVER = 4
BROADCAST = 5
RESERVED_NTP_CONTROL = 6
RESERVED_PRIVATE = 7
def unpack_pop(fmt, data_iter: iter):
size = struct.calcsize(fmt)
data = b""
for _ in range(size):
data += next(data_iter).to_bytes(1, "big")
return struct.unpack(fmt, data)
@dataclass
class NtpPacket:
leap: LeapIndicator
version: int # This could be an enum as well, depending on your requirements
mode: Mode
stratum: int = 0
poll: int = 0
precision: int = 0
root_delay: float = 0.0
root_dispersion: float = 0.0
reference_id: int = 0
reference_timestamp: float = 0.0
originate_timestamp: float = 0.0
receive_timestamp: float = 0.0
transmit_timestamp: float = 0.0
kiss_of_death: Optional[str] = None
@staticmethod
def decode(data: bytes) -> 'NtpPacket':
data_iter = iter(data)
flags = unpack_pop('!B', data_iter)[0]
leap = LeapIndicator((flags >> 6) & 0x03)
version = (flags >> 3) & 0x07
mode = Mode(flags & 0x07)
stratum = unpack_pop('!B', data_iter)[0]
if stratum == 0:
# Kiss of death message
kiss_of_death = unpack_pop('!4s', data_iter)[0].decode('utf-8')
return NtpPacket(leap, version, mode, stratum, kiss_of_death=kiss_of_death)
poll, precision = unpack_pop('!BB', data_iter)
root_delay, root_dispersion = unpack_pop('!2I', data_iter)
reference_id = unpack_pop('!I', data_iter)[0]
# Timestamps are in fixed-point format, with the integer part in the first 32 bits
# and the fractional part in the last 32 bits
fields = {}
for ts_name in ['reference_timestamp', 'originate_timestamp',
'receive_timestamp', 'transmit_timestamp']:
seconds, seconds_frac = unpack_pop('!2I', data_iter)
fields[ts_name] = seconds + (seconds_frac / 2.0 ** 32.0) - TIME1970
return NtpPacket(leap, version, mode, stratum, poll, precision,
root_delay / 2 ** 16, root_dispersion / 2 ** 16, reference_id,
**fields)
def encode(self) -> bytes:
flags = (self.leap.value << 6) | (self.version << 3) | self.mode.value
root_delay = int(self.root_delay * 2 ** 16)
root_dispersion = int(self.root_dispersion * 2 ** 16)
packed_data = struct.pack('!B3B3I', flags, self.stratum, self.poll,
self.precision, root_delay, root_dispersion,
self.reference_id)
# Encode timestamps
for ts in [self.reference_timestamp, self.originate_timestamp,
self.receive_timestamp, self.transmit_timestamp]:
offset_ts = ts + TIME1970
int_part = int(offset_ts)
frac_part = int((offset_ts - int_part) * 2 ** 32)
packed_data += struct.pack('!2I', int_part, frac_part)
return packed_data
def send_ntp_request(client):
# Getting the current time as T1 (originate timestamp)
originate_timestamp = time.time()
request_data = NtpPacket(
leap=LeapIndicator.NO_WARNING,
version=3,
mode=Mode.CLIENT,
originate_timestamp=originate_timestamp
).encode()
client.send(request_data)
return client, originate_timestamp
def receive_ntp_response(client, t1):
data, address = client.recvfrom(1024)
t4 = time.time()
if len(data) != 48:
print(f"Unexpected packet size: {len(data)}")
return None, None, None
parsed = NtpPacket.decode(data)
print(parsed)
t2 = parsed.receive_timestamp # T2
t3 = parsed.transmit_timestamp # T3
# Compute the round trip delay and local clock offset
round_trip_delay = (t4 - t1) - (t3 - t2)
local_clock_offset = ((t2 - t1) + (t3 - t4)) / 2
# Corrected time using local clock offset
corrected_time = t4 + local_clock_offset
print(f"Server's Time: {time.ctime(corrected_time)}")
print(f"Round Trip Delay: {round_trip_delay} seconds")
print(f"Local Clock Offset: {local_clock_offset} seconds")
def main():
for _ in range(5):
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.settimeout(5)
client.connect((NTP_SERVER, NTP_PORT))
for _ in range(5):
client, t1 = send_ntp_request(client)
receive_ntp_response(client, t1)
client.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment