Skip to content

Instantly share code, notes, and snippets.

@VinDuv
Created February 2, 2024 06:09
Show Gist options
  • Save VinDuv/008cffa620bf09db68683eae17a5eaff to your computer and use it in GitHub Desktop.
Save VinDuv/008cffa620bf09db68683eae17a5eaff to your computer and use it in GitHub Desktop.
ACTS time sync emulator for Atomic Clock 2.0
#!/usr/bin/env python3
"""
Simulates a modem that will connect to an ACTS time sync server.
"""
# https://www.nist.gov/pml/time-and-frequency-division/time-distribution/
# automated-computer-time-service-acts
# https://pdfs.semanticscholar.org/65b9/d62c0c0002781d22e00ff6456c8b1baa4420.pdf
from datetime import datetime, timedelta
import argparse
import re
import socket
import selectors
import time
import typing
DEFAULT_DATE = datetime(1988, 3, 2, 21, 39, 15)
LABEL = 'UTC(NIST)'
MJD_EPOCH = datetime(1858, 11, 17)
def run():
"""
Program entry point
"""
parser = argparse.ArgumentParser(description=__doc__)
ModemIOBase.register_subparsers(parser)
args = parser.parse_args()
io_handler = ModemIOBase.create(args)
try:
emulator = ModemEmulator(io_handler)
emulator.run()
except KeyboardInterrupt:
print("")
finally:
io_handler.close()
class ModemEmulator:
"""
Emulates a modem connected to a time sync server.
"""
class Reset(Exception):
"""
Exception thrown when an AT Z is received. Used to reset the program
state to the beginning.
"""
class ACTS(typing.NamedTuple):
"""
Data contained in an ACTS time sync line.
"""
tstamp: datetime # UTC timestamp
dst: int # Number of days before/after US DST (roughly)
leap: bool # Indicates if a leap second occurs this month
dut1: float # UTC to UT1 conversion factor, between -.8 and +.8
adv: int # Current time code advance (to compensate line delay), in ms
label: str # Label indicating the timezone and time server name
otm: str # Character sent when the timestamp occurs
def __init__(self, io_handler, label=LABEL, tstamp=DEFAULT_DATE):
self._io = io_handler
self._line_buf = b''
self._label = label
self._tstamp = tstamp
def run(self):
"""
Sets the I/O handler up and runs the emulation.
"""
self._io.setup()
print("Waiting for initial AT Z...")
self._wait_reset()
while True:
try:
self._run()
self._wait_reset()
except self.Reset:
print("Got a reset command, restarting.")
self._send_line('OK')
def _run(self):
"""
Runs the emulation. Raises Reset if an AT Z is received.
"""
self._expect(r'AT &F')
self._send_line('OK')
self._expect(r'AT V1 E1 Q0 X4 &C1 &D2')
self._send_line('OK')
self._expect(r'AT &Q0')
self._send_line('OK')
self._expect(r'AT S95=44')
self._send_line('OK')
self._expect(r'AT M. B.')
self._send_line('OK')
number, = self._expect(r'ATDT (.*)')
print(f"Dialed phone number: {number}")
time.sleep(2)
self._send_line('CONNECT1200')
time.sleep(.5)
self._send_line('?=Help')
self._send_line('National Institute of Standards and Technology')
self._send_line('Telephone Time Service')
time.sleep(1)
self._send_line(' D L D')
self._send_line('MJD YR MO DA H M S ST S UT1 msADV OTM')
cur_tstamp = self._tstamp
incr = timedelta(seconds=1)
for _ in range(4):
acts = self.ACTS(cur_tstamp, dst=83, leap=False, dut1=.3, adv=45,
label=self._label, otm='*')
self._send_acts(acts)
cur_tstamp += incr
time.sleep(1)
for _ in range(4):
acts = self.ACTS(cur_tstamp, dst=83, leap=False, dut1=.3, adv=37.6,
label=self._label, otm='#')
self._send_acts(acts)
cur_tstamp += incr
time.sleep(1)
self._expect(r'\+\+\+ATH0')
def _expect(self, regex, timeout=None):
"""
Expects a specific value (as a regex) from the modem.
Returns the regular expression groups.
If the match fails, ignore the line (unless it’s AT Z, in which case
Reset is raised)
"""
while True:
line = self._get_line(timeout=timeout, echo=False)
match = re.match(regex, line)
if not match:
if line.replace(' ', '') == 'ATZ':
print(f"=> {line} (reset!)")
raise self.Reset()
print(f"=> {line} (unexpected)")
continue
print(f"=> {line}")
return match.groups()
def _send_acts(self, acts):
"""
Send an ACTS sync line.
"""
tstamp, dst, leap, dut1, adv, label, otm = acts
mjd = (tstamp - MJD_EPOCH).days
date_str = tstamp.strftime('%y-%m-%d %H:%M:%S')
dut1 = f"{'+' if dut1 >= 0 else '-'}.{round(abs(dut1 * 10)):1d}"
self._send_line(f'{mjd:05d} {date_str} {dst:02d} {leap:d} {dut1} '
f'{adv:05.1f} {label} {otm}')
def _send_line(self, line):
"""
Sends a line, followed by \r\n.
"""
print(f"<= {line}")
self._io.write(line.encode('ascii') + b'\r\n')
def _get_line(self, timeout=None, echo=True):
"""
Receives a full lines from the I/O, with a timeout.
"""
while True:
crlf_pos = self._line_buf.find(b'\r')
if crlf_pos >= 0:
line = self._line_buf[0:crlf_pos].decode('ascii', 'replace')
self._line_buf = self._line_buf[crlf_pos + 2:]
if echo:
print(f"=> {line}")
return line
self._line_buf += self._io.read(timeout)
def _wait_reset(self):
"""
Wait for ATZ to be sent to the modem.
"""
while True:
line = self._get_line().replace(' ', '')
if line == 'ATZ':
self._send_line('OK')
break
def __repr__(self):
return f"ModemEmulator({self._io!r})"
class ModemIOBase:
"""
Base class for modem I/O.
"""
_classes = {}
def __init_subclass__(cls, name, title):
"""
Called for each subclass of ModemIOBase; registers the classes in a
dictionary so they can be registered in the command-line parser.
"""
super().__init_subclass__()
if name is None:
# Abstract subclass
return
assert name not in cls._classes
cls._classes[name] = (title, cls)
def setup(self):
"""
Prepares the instance. When this method returns, the read() and write()
method can be called.
"""
# The base implementation does nothing
def read(self, timeout=None):
"""
Receive data from the program writing to the COM port and returns it.
Raises TimeoutError if the timeout expires with no data received.
"""
raise NotImplementedError("Must be implemented in subclasses")
def write(self, data):
"""
Send data to the program reading from the COM port.
"""
raise NotImplementedError("Must be implemented in subclasses")
def close(self):
"""
Releases the resources used by the I/O.
"""
# The base implementation does nothing
@classmethod
def register_subparsers(cls, parser):
"""
Registers the I/O subclasses as sub-parsers. This allow each subclass
to have its own command-line options.
"""
subparsers = parser.add_subparsers(
metavar='port_type', required=True)
for name, (title, subclass) in cls._classes.items():
subparser = subparsers.add_parser(name, help=title)
subclass.register_options(subparser)
subparser.set_defaults(cls=subclass)
@staticmethod
def create(args):
"""
Create an instance of the class from the parsed command-line arguments.
"""
return args.cls.create_from_args(args)
@classmethod
def create_from_args(cls, args):
"""
Create an instance of the sub-class from the parsed command-line
arguments.
"""
raise NotImplementedError("Must be implemented in subclasses")
@classmethod
def register_options(cls, subparser):
"""
Registers the command-line options used by the I/O subclass.
"""
# The base implementation does not register any options
class ModemIOTCPBase(ModemIOBase, name=None, title=None):
"""
Base class for TCP modem I/O.
"""
def __init__(self, host, port):
self._read_buf = b''
self._host = host
self._port = port
self._socket = None
self._selector = None
def setup(self):
self.close()
conn = self._get_connection()
conn.setblocking(False)
selector = selectors.DefaultSelector()
selector.register(conn, selectors.EVENT_READ)
self._socket = conn
self._selector = selector
def read(self, timeout=None):
if not self._selector.select(timeout):
raise TimeoutError(f"No data received after {timeout} seconds")
data = self._socket.recv(4096)
if not data:
print("Connection closed by remote")
self.setup()
return data
def write(self, data):
self._socket.sendall(data)
def close(self):
if self._socket is not None:
self._socket.close()
self._selector.close()
self._socket = None
self._selector = None
@classmethod
def create_from_args(cls, args):
host, port = args.address
return cls(host, port)
@classmethod
def address(cls, value):
"""
Parses an address string into a (host, port) tuple.
"""
host, _, raw_port = value.rpartition(':')
if not host:
host = '127.0.0.1'
try:
port = int(raw_port)
except ValueError:
port = -1
if port <= 0 or port >= 65536:
raise argparse.ArgumentTypeError(f"Invalid port {raw_port}")
return (host, port)
def _get_connection(self):
"""
Connect to the remote end, and return the connected socket.
"""
raise NotImplementedError("Must be implemented in subclasses")
class ModemIOTCPClient(ModemIOTCPBase, name='tcp', title="TCP Client"):
"""
Modem I/O that connects to a TCP server. Useful with qemu’s -serial tcp
option.
"""
def _get_connection(self):
print(f"Connecting to {self._host}:{self._port}... ", end="",
flush=True)
while True:
try:
conn = socket.create_connection((self._host, self._port))
print("Connected.")
return conn
except ConnectionRefusedError:
time.sleep(2)
@classmethod
def register_options(cls, subparser):
subparser.add_argument('address', type=cls.address,
help="Connect address (<host>:<port> or <port> for localhost)")
class ModemIOTCPServer(ModemIOTCPBase, name='tcps', title="TCP Server"):
"""
Modem I/O that waits for connections on a TCP socket. Useful with qemu’s
-serial tcp option.
"""
def _get_connection(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as listener:
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
listener.bind((self._host, self._port))
listener.listen(1)
print(f"Waiting for connection on {self._host}:{self._port}... ",
end="", flush=True)
conn, addr = listener.accept()
print(f"Got connection from {addr[0]}:{addr[1]}.")
return conn
@classmethod
def register_options(cls, subparser):
subparser.add_argument('address', type=cls.address,
help="Listen address (<host>:<port> or <port>)")
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment