Skip to content

Instantly share code, notes, and snippets.

@nobrowser
Created March 20, 2024 02:53
Show Gist options
  • Save nobrowser/285d63410b9c2aec77a53587020cf2b6 to your computer and use it in GitHub Desktop.
Save nobrowser/285d63410b9c2aec77a53587020cf2b6 to your computer and use it in GitHub Desktop.
A daemon to read data off an Adafruit/Mediatek GPS module (over I2C) and feed it to chrony refclock SOCK
import board
import adafruit_gps
from struct import Struct
from time import time, sleep
from calendar import timegm
import sys
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
from socket import socket, socketpair, AF_UNIX, SOCK_DGRAM
import json
from dataclasses import dataclass, asdict, astuple
import os
from statistics import mean, stdev, median
from math import floor, isnan
REFCLOCK_SOCK_FORMAT = "@iidiiii"
REFCLOCK_SOCK_MAGIC = 0x534f434b
# the 1 bit turns on RMC
GPS_SELECT_CMD = "PMTK314,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
GPS_HZ_CMD = "PMTK220,1000"
GPS_RESET_CMD = "PMTK314,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0"
CHRONY = "/run/mtk_refclock/mtk_refclock.sock"
# Removal of this file serves as a signal to exit
SEMA = "/run/mtk_refclock/semaphore"
@dataclass
class RefclockData:
tv_sec: int
tv_usec: int
offset: float
z1: int = 0
z2: int = 0
z3: int = 0
magic: int = REFCLOCK_SOCK_MAGIC
formatter = Struct(REFCLOCK_SOCK_FORMAT)
def frac_part(f: float):
return f - float(floor(f))
def milli(f: float):
return floor(frac_part(f) * 1_000_000.0)
def is_not_none(a):
return a is not None
# a subclass of GPS which keeps track of timestamp offset to system time
class MtkGPS(adafruit_gps.GPS_GtopI2C):
def __init__(self):
adafruit_gps.GPS_GtopI2C.__init__(self, board.I2C())
self.last_systime = None
self.last_datetime = None
self.rlist = [self._i2c.i2c._i2c._i2c_bus._device.fileno()]
self.updated = False
def send_string(self, s: str):
cmd = bytes(s, "ascii")
self.send_command(cmd)
def mtk_setup(self):
self.send_string(GPS_SELECT_CMD)
sleep(0.5)
self.send_string(GPS_HZ_CMD)
def mtk_reset(self):
self.send_string(GPS_RESET_CMD)
# record the time after we read the first byte. let's hope that's
# a decent approximation to the actual second boundary
def readline(self):
charbuf = None
while not charbuf:
charbuf = self.read(1)
self.last_systime = time()
self._internalbuffer.append(charbuf[0])
return adafruit_gps.GPS_GtopI2C.readline(self)
# no unrestrained gringo optimism if time has not stepped
def update(self):
if not self.updated:
self.updated = True
else:
sleep(0.500)
if not adafruit_gps.GPS_GtopI2C.update(self):
return False
if not self.datetime or self.datetime == self.last_datetime:
return False
self.last_datetime = self.datetime
return True
def get_offset(self):
return self.last_systime - timegm(self.datetime)
# there is a systematic bias in the offset values which must be corrected
# or else chrony will reject our source. This is how we do it
class DelayStats:
def __init__(self, max_delays: int):
self.max_delays = max_delays
self.delays = [None] * max_delays
self.next_delay = 0
self.mean_delay = None
def add_delay(self, delay: float):
if isnan(delay):
return
self.delays[self.next_delay] = delay
self.next_delay = (self.next_delay + 1) % self.max_delays
self.mean_delay = None
def is_ready(self):
return sum(map(is_not_none, self.delays)) == self.max_delays
def mean(self):
if self.mean_delay is None:
self.mean_delay = mean(self.delays)
return self.mean_delay
def stdev(self):
return stdev(self.delays, xbar=self.mean())
def stable_median(self):
m, d = self.mean(), self.stdev()
def not_outlier(f):
return m - 2.0 * d <= f and f <= m + 2.0 * d
return median(filter(not_outlier, self.delays))
def main_loop(argv, gps):
stats = DelayStats(argv.max_delays)
with socket(AF_UNIX, SOCK_DGRAM) as sock:
sock.connect(argv.socket)
while True:
if not gps.update():
continue
systime, offset = gps.last_systime, gps.get_offset()
stats.add_delay(offset)
# choose stdev bound so all delays we use are < 100ms absolute
if not stats.is_ready() or stats.stdev() > 0.050:
continue
tv_sec, tv_usec = floor(systime), milli(systime)
coffset = stats.stable_median() - float(argv.correction) / 1_000.0
rd = RefclockData(tv_sec, tv_usec, coffset)
if argv.not_really:
sys.stdout.write(json.dumps(asdict(rd)) + "\n")
continue
try:
sock.sendall(formatter.pack(*astuple(rd)))
except Exception as e:
sys.stderr.write(f"{e}\n")
if not os.access(SEMA, os.F_OK):
break
def main():
argp = ArgumentParser(
prog="mtk_refclock",
description="Simple GPS refclock",
formatter_class=ArgumentDefaultsHelpFormatter,
)
argp.add_argument(
"-n", "--not_really",
action="store_true",
help="Run a fake clock printing packets to stdout",
)
argp.add_argument(
"-s", "--socket",
metavar="PATH",
default=CHRONY,
help="Send packets to socket at PATH",
)
argp.add_argument(
"-m", "--max_delays",
metavar="N",
type=int,
default=60,
help="Collect N samples to determine fractional second delay",
)
argp.add_argument(
"-c", "--correction",
metavar="N",
type=int,
default=0,
help="Add a constant correction of N milliseconds",
)
argv = argp.parse_args()
gps = MtkGPS()
gps.mtk_setup()
try:
main_loop(argv, gps)
except KeyboardInterrupt:
# this is just to avoid the pointless backtrace
pass
finally:
gps.mtk_reset()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment