Skip to content

Instantly share code, notes, and snippets.

@gamblor21
Created June 17, 2022 03:34
Show Gist options
  • Save gamblor21/e9681057222f6a70fca0608ae3054b3e to your computer and use it in GitHub Desktop.
Save gamblor21/e9681057222f6a70fca0608ae3054b3e to your computer and use it in GitHub Desktop.
CircuitPython GPS Async testing
import time
import board
import busio
import asyncio
import adafruit_gps
import gps_asyncio
import UART_asyncio
# switch to 115200bps if we are at the default 9600 at startup
uart = busio.UART(board.TX, board.RX, baudrate=9600, timeout=10)
gps = adafruit_gps.GPS(uart, debug=False) # Use UART/pyserial
gps.send_command(b"PMTK251,115200")
uart.deinit()
#uart = busio.UART(board.TX, board.RX, baudrate=115200, receiver_buffer_size=500, timeout=10)
uart = UART_asyncio.UART_asyncio(board.TX, board.RX, baudrate=115200, receiver_buffer_size=500, timeout=10)
uart.reset_input_buffer()
# Create a GPS module instance.
#gps = adafruit_gps.GPS(uart, debug=False) # Use UART/pyserial
gps = gps_asyncio.GPS_asyncio(uart, debug=False) # Use UART/pyserial
gps.send_command(b"PMTK313,1") # SBAS Enable
gps.send_command(b"PMTK301,2") # DGPS Mode WAAS
gps.send_command(b"PMTK386,0") # do not update position if m/s less
# Turn on the basic GGA and RMC info (what you typically want)
gps.send_command(b"PMTK314,0,10,0,10,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0")
# Set update rate to 10hz
gps.send_command(b"PMTK220,100")
print("Go")
async def gps_update():
while True:
await gps.update()
async def display_update():
while True:
if gps.has_fix is False:
await asyncio.sleep(0)
continue
print("Lat: {0:.6f} degrees Lon: {1:.6f} degrees".format(gps.latitude, gps.longitude))
print("Lat: {:2.}.{} degrees Lon: {:2.}.{} degrees".format(gps.latitude_degrees, str(gps.latitude_minutes/100)[2:], gps.longitude_degrees, str(gps.longitude_minutes/100)[2:]))
gps_speed = gps.speed_knots
if gps_speed is None:
gps_speed = 0.0
gps_speed *= 1.852
gps_course = gps.track_angle_deg
if gps_course is None:
gps_course = 0
print("Speed: {} kph Course: {} degrees".format(gps_speed, gps_course))
if gps.satellites is not None:
print("# satellites: {} fix: {} hdil: {}".format(gps.satellites, gps.fix_quality, gps.horizontal_dilution))
await asyncio.sleep(1.0)
async def main():
print("Creating tasks")
update_task = asyncio.create_task(gps_update())
display_task = asyncio.create_task(display_update())
while True:
print(".", end='') # to see that something else is running between prints and gps.update()
await asyncio.sleep(0.1)
print("Done")
asyncio.run(main())
There are two main files here to give GPS asyncio ability.
UART_asyncio - wrap the core busio.UART read and readline methods with methods that will not block.
I am not sure how many core classes this type of wrapping would work for. But for UART seemed fine.
Things could be expanded more (write only writes X bytes before letting something else go). CPython's
serial_asyncio is somewhat close to what I'm doing, though CPython has Transport and Protocols with asyncoio
that CP does not have.
gps_asyncio - Right now a subclass of the adafruit_gps.GPS class but as someone suggested may be better as a
fork project as a lot of code had to be copied so not really OO. In theory could refactor the GPS class too
so the non-async methods all have their own functions and you can override just the async ones. But not even sure
that would work. To make gps.update() async it then went to _parse_message(), _read_message() and readline() that
all had to be modified. And then make sure the new class gets a UART_asyncio class or the readline just blocks
anyways (and then why bother). You could rewrite GPS to use UART.read() and .in_waiting only and asyncio.sleep(0.0001)
if nothing is waiting. I just did UART fully async to learn.
code.py - simple GPS example based on the adafruit_gps example file I was pulling apart for my own use. Simplified it
as an example here.
# good stuff starts about line 180
import adafruit_gps
import asyncio
_GPSI2C_DEFAULT_ADDRESS = const(0x10)
_GLL = 0
_RMC = 1
_GGA = 2
_GSA = 3
_GSA_4_11 = 4
_GSV7 = 5
_GSV11 = 6
_GSV15 = 7
_GSV19 = 8
_RMC_4_1 = 9
_ST_MIN = _GLL
_ST_MAX = _RMC_4_1
_SENTENCE_PARAMS = (
# 0 - _GLL
"dcdcscC",
# 1 - _RMC
"scdcdcffsDCC",
# 2 - _GGA
"sdcdciiffsfsIS",
# 3 - _GSA
"ciIIIIIIIIIIIIfff",
# 4 - _GSA_4_11
"ciIIIIIIIIIIIIfffS",
# 5 - _GSV7
"iiiiiiI",
# 6 - _GSV11
"iiiiiiIiiiI",
# 7 - _GSV15
"iiiiiiIiiiIiiiI",
# 8 - _GSV19
"iiiiiiIiiiIiiiIiiiI",
# 9 - _RMC_4_1
"scdcdcffsDCCC",
)
# Internal helper parsing functions.
# These handle input that might be none or null and return none instead of
# throwing errors.
def _parse_degrees(nmea_data):
# Parse a NMEA lat/long data pair 'dddmm.mmmm' into a pure degrees value.
# Where ddd is the degrees, mm.mmmm is the minutes.
if nmea_data is None or len(nmea_data) < 3:
return None
# To avoid losing precision handle degrees and minutes separately
# Return the final value as an integer. Further functions can parse
# this into a float or separate parts to retain the precision
raw = nmea_data.split(".")
degrees = int(raw[0]) // 100 * 1000000 # the ddd
minutes = int(raw[0]) % 100 # the mm.
minutes += int(f"{raw[1][:4]:0<4}") / 10000
minutes = int(minutes / 60 * 1000000)
return degrees + minutes # return parsed string in the format dddmmmmmm
def _parse_int(nmea_data):
if nmea_data is None or nmea_data == "":
return None
return int(nmea_data)
def _parse_float(nmea_data):
if nmea_data is None or nmea_data == "":
return None
return float(nmea_data)
def _parse_str(nmea_data):
if nmea_data is None or nmea_data == "":
return None
return str(nmea_data)
def _read_degrees(data, index, neg):
# This function loses precision with float32
x = data[index] / 1000000
if data[index + 1].lower() == neg:
x *= -1.0
return x
def _read_int_degrees(data, index, neg):
deg = data[index] // 1000000
minutes = data[index] % 1000000 / 10000
if data[index + 1].lower() == neg:
deg *= -1
return (deg, minutes)
def _parse_talker(data_type):
# Split the data_type into talker and sentence_type
if data_type[:1] == b"P": # Proprietary codes
return (data_type[:1], data_type[1:])
return (data_type[:2], data_type[2:])
def _parse_data(sentence_type, data):
"""Parse sentence data for the specified sentence type and
return a list of parameters in the correct format, or return None.
"""
# pylint: disable=too-many-branches
if not _ST_MIN <= sentence_type <= _ST_MAX:
# The sentence_type is unknown
return None
param_types = _SENTENCE_PARAMS[sentence_type]
if len(param_types) != len(data):
# The expected number does not match the number of data items
return None
params = []
try:
for i, dti in enumerate(data):
pti = param_types[i]
len_dti = len(dti)
nothing = dti is None or len_dti == 0
if pti == "c":
# A single character
if len_dti != 1:
return None
params.append(dti)
elif pti == "C":
# A single character or Nothing
if nothing:
params.append(None)
elif len_dti != 1:
return None
else:
params.append(dti)
elif pti == "d":
# A number parseable as degrees
params.append(_parse_degrees(dti))
elif pti == "D":
# A number parseable as degrees or Nothing
if nothing:
params.append(None)
else:
params.append(_parse_degrees(dti))
elif pti == "f":
# A floating point number
params.append(_parse_float(dti))
elif pti == "i":
# An integer
params.append(_parse_int(dti))
elif pti == "I":
# An integer or Nothing
if nothing:
params.append(None)
else:
params.append(_parse_int(dti))
elif pti == "s":
# A string
params.append(dti)
elif pti == "S":
# A string or Nothing
if nothing:
params.append(None)
else:
params.append(dti)
else:
raise TypeError(f"GPS: Unexpected parameter type '{pti}'")
except ValueError:
# Something didn't parse, abort
return None
# Return the parsed data
return params
class GPS_asyncio(adafruit_gps.GPS):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
async def read(self, num_bytes):
"""Read up to num_bytes of data from the GPS directly, without parsing.
Returns a bytearray with up to num_bytes or None if nothing was read"""
return await self._uart.read(num_bytes)
async def readline(self):
"""Returns a newline terminated bytearray, must have timeout set for
the underlying UART or this will block forever!"""
return await self._uart.readline()
async def _read_sentence(self):
# Parse any NMEA sentence that is available.
# pylint: disable=len-as-condition
# This needs to be refactored when it can be tested.
sentence = await self.readline()
if sentence is None or sentence == b"" or len(sentence) < 1:
return None
try:
sentence = str(sentence, "ascii").strip()
except UnicodeError:
return None
# Look for a checksum and validate it if present.
if len(sentence) > 7 and sentence[-3] == "*":
# Get included checksum, then calculate it and compare.
expected = int(sentence[-2:], 16)
actual = 0
for i in range(1, len(sentence) - 3):
actual ^= ord(sentence[i])
if actual != expected:
return None # Failed to validate checksum.
# copy the raw sentence
self._raw_sentence = sentence
return sentence
# At this point we don't have a valid sentence
return None
async def _parse_sentence(self):
sentence = await self._read_sentence()
# sentence is a valid NMEA with a valid checksum
if sentence is None:
return None
# Remove checksum once validated.
sentence = sentence[:-3]
# Parse out the type of sentence (first string after $ up to comma)
# and then grab the rest as data within the sentence.
delimiter = sentence.find(",")
if delimiter == -1:
return None # Invalid sentence, no comma after data type.
data_type = sentence[1:delimiter]
return (data_type, sentence[delimiter + 1 :])
async def update(self):
"""Check for updated data from the GPS module and process it
accordingly. Returns True if new data was processed, and False if
nothing new was received.
"""
# Grab a sentence and check its data type to call the appropriate
# parsing function.
try:
sentence = await self._parse_sentence()
except UnicodeError:
return None
if sentence is None:
return False
if self.debug:
print(sentence)
data_type, args = sentence
if len(data_type) < 5:
return False
data_type = bytes(data_type.upper(), "ascii")
(talker, sentence_type) = _parse_talker(data_type)
# Check for all currently known GNSS talkers
# GA - Galileo
# GB - BeiDou Systems
# GI - NavIC
# GL - GLONASS
# GP - GPS
# GQ - QZSS
# GN - GNSS / More than one of the above
if talker not in (b"GA", b"GB", b"GI", b"GL", b"GP", b"GQ", b"GN"):
# It's not a known GNSS source of data
# Assume it's a valid packet anyway
return True
result = True
args = args.split(",")
if sentence_type == b"GLL": # Geographic position - Latitude/Longitude
result = self._parse_gll(args)
elif sentence_type == b"RMC": # Minimum location info
result = self._parse_rmc(args)
elif sentence_type == b"GGA": # 3D location fix
result = self._parse_gga(args)
elif sentence_type == b"GSV": # Satellites in view
result = self._parse_gsv(talker, args)
elif sentence_type == b"GSA": # GPS DOP and active satellites
result = self._parse_gsa(talker, args)
return result
import asyncio
import busio
class UART_asyncio(busio.UART):
def __init__(self, *args, **kwargs):
self._poll_wait = 0.005
super().__init__(*args, **kwargs)
async def read(self, nbytes):
to_go = nbytes
buffer = bytearray(0)
while to_go != 0:
to_read = min(to_go, self.in_waiting)
if to_read != 0:
buffer.extend(super().read(to_read))
to_go -= to_read
else:
await asyncio.sleep(self._poll_wait)
return bytes(buffer)
async def readline(self):
buffer = bytearray(0)
b = ''
while b != b'\n':
b = await self.read(1)
buffer.extend(b)
return bytes(buffer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment