Created
March 20, 2024 02:53
-
-
Save nobrowser/285d63410b9c2aec77a53587020cf2b6 to your computer and use it in GitHub Desktop.
A daemon to read data off an Adafruit/Mediatek GPS module (over I2C) and feed it to chrony refclock SOCK
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import board | |
import adafruit_gps | |
from struct import Struct | |
from time import time, sleep | |
from calendar import timegm | |
import sys | |
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter | |
from socket import socket, socketpair, AF_UNIX, SOCK_DGRAM | |
import json | |
from dataclasses import dataclass, asdict, astuple | |
import os | |
from statistics import mean, stdev, median | |
from math import floor, isnan | |
REFCLOCK_SOCK_FORMAT = "@iidiiii" | |
REFCLOCK_SOCK_MAGIC = 0x534f434b | |
# the 1 bit turns on RMC | |
GPS_SELECT_CMD = "PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0" | |
GPS_HZ_CMD = "PMTK220,1000" | |
GPS_RESET_CMD = "PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0" | |
CHRONY = "/run/mtk_refclock/mtk_refclock.sock" | |
# Removal of this file serves as a signal to exit | |
SEMA = "/run/mtk_refclock/semaphore" | |
@dataclass | |
class RefclockData: | |
tv_sec: int | |
tv_usec: int | |
offset: float | |
z1: int = 0 | |
z2: int = 0 | |
z3: int = 0 | |
magic: int = REFCLOCK_SOCK_MAGIC | |
formatter = Struct(REFCLOCK_SOCK_FORMAT) | |
def frac_part(f: float): | |
return f - float(floor(f)) | |
def milli(f: float): | |
return floor(frac_part(f) * 1_000_000.0) | |
def is_not_none(a): | |
return a is not None | |
# a subclass of GPS which keeps track of timestamp offset to system time | |
class MtkGPS(adafruit_gps.GPS_GtopI2C): | |
def __init__(self): | |
adafruit_gps.GPS_GtopI2C.__init__(self, board.I2C()) | |
self.last_systime = None | |
self.last_datetime = None | |
self.rlist = [self._i2c.i2c._i2c._i2c_bus._device.fileno()] | |
self.updated = False | |
def send_string(self, s: str): | |
cmd = bytes(s, "ascii") | |
self.send_command(cmd) | |
def mtk_setup(self): | |
self.send_string(GPS_SELECT_CMD) | |
sleep(0.5) | |
self.send_string(GPS_HZ_CMD) | |
def mtk_reset(self): | |
self.send_string(GPS_RESET_CMD) | |
# record the time after we read the first byte. let's hope that's | |
# a decent approximation to the actual second boundary | |
def readline(self): | |
charbuf = None | |
while not charbuf: | |
charbuf = self.read(1) | |
self.last_systime = time() | |
self._internalbuffer.append(charbuf[0]) | |
return adafruit_gps.GPS_GtopI2C.readline(self) | |
# no unrestrained gringo optimism if time has not stepped | |
def update(self): | |
if not self.updated: | |
self.updated = True | |
else: | |
sleep(0.500) | |
if not adafruit_gps.GPS_GtopI2C.update(self): | |
return False | |
if not self.datetime or self.datetime == self.last_datetime: | |
return False | |
self.last_datetime = self.datetime | |
return True | |
def get_offset(self): | |
return self.last_systime - timegm(self.datetime) | |
# there is a systematic bias in the offset values which must be corrected | |
# or else chrony will reject our source. This is how we do it | |
class DelayStats: | |
def __init__(self, max_delays: int): | |
self.max_delays = max_delays | |
self.delays = [None] * max_delays | |
self.next_delay = 0 | |
self.mean_delay = None | |
def add_delay(self, delay: float): | |
if isnan(delay): | |
return | |
self.delays[self.next_delay] = delay | |
self.next_delay = (self.next_delay + 1) % self.max_delays | |
self.mean_delay = None | |
def is_ready(self): | |
return sum(map(is_not_none, self.delays)) == self.max_delays | |
def mean(self): | |
if self.mean_delay is None: | |
self.mean_delay = mean(self.delays) | |
return self.mean_delay | |
def stdev(self): | |
return stdev(self.delays, xbar=self.mean()) | |
def stable_median(self): | |
m, d = self.mean(), self.stdev() | |
def not_outlier(f): | |
return m - 2.0 * d <= f and f <= m + 2.0 * d | |
return median(filter(not_outlier, self.delays)) | |
def main_loop(argv, gps): | |
stats = DelayStats(argv.max_delays) | |
with socket(AF_UNIX, SOCK_DGRAM) as sock: | |
sock.connect(argv.socket) | |
while True: | |
if not gps.update(): | |
continue | |
systime, offset = gps.last_systime, gps.get_offset() | |
stats.add_delay(offset) | |
# choose stdev bound so all delays we use are < 100ms absolute | |
if not stats.is_ready() or stats.stdev() > 0.050: | |
continue | |
tv_sec, tv_usec = floor(systime), milli(systime) | |
coffset = stats.stable_median() - float(argv.correction) / 1_000.0 | |
rd = RefclockData(tv_sec, tv_usec, coffset) | |
if argv.not_really: | |
sys.stdout.write(json.dumps(asdict(rd)) + "\n") | |
continue | |
try: | |
sock.sendall(formatter.pack(*astuple(rd))) | |
except Exception as e: | |
sys.stderr.write(f"{e}\n") | |
if not os.access(SEMA, os.F_OK): | |
break | |
def main(): | |
argp = ArgumentParser( | |
prog="mtk_refclock", | |
description="Simple GPS refclock", | |
formatter_class=ArgumentDefaultsHelpFormatter, | |
) | |
argp.add_argument( | |
"-n", "--not_really", | |
action="store_true", | |
help="Run a fake clock printing packets to stdout", | |
) | |
argp.add_argument( | |
"-s", "--socket", | |
metavar="PATH", | |
default=CHRONY, | |
help="Send packets to socket at PATH", | |
) | |
argp.add_argument( | |
"-m", "--max_delays", | |
metavar="N", | |
type=int, | |
default=60, | |
help="Collect N samples to determine fractional second delay", | |
) | |
argp.add_argument( | |
"-c", "--correction", | |
metavar="N", | |
type=int, | |
default=0, | |
help="Add a constant correction of N milliseconds", | |
) | |
argv = argp.parse_args() | |
gps = MtkGPS() | |
gps.mtk_setup() | |
try: | |
main_loop(argv, gps) | |
except KeyboardInterrupt: | |
# this is just to avoid the pointless backtrace | |
pass | |
finally: | |
gps.mtk_reset() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment