Skip to content

Instantly share code, notes, and snippets.

@jones2126
Last active February 9, 2025 23:32
Show Gist options
  • Save jones2126/fe96dec896f4baf0812cf70a1da192ff to your computer and use it in GitHub Desktop.
Save jones2126/fe96dec896f4baf0812cf70a1da192ff to your computer and use it in GitHub Desktop.
A script used for testing passing correction data over USB with two SkytraQ GPS units connected via USB to one laptop.
'''
com6tocom10.py
This is a script used for testing passing correction data over USB with two GPS units connected
to one laptop. The script performs these functions:
1. Forward RTCM correction data from my Base GNSS receiver, a SkytraQ PX1125R, connected on (COM6) to a
Rover GNSS receiver (COM10).
2. Monitor NMEA messages from my Rover, a Skytraq PX1172RDP, on COM10 and extract RTK Fix status
from $GPGGA messages.
3. Print the RTK Fix status every 5 seconds along with the last $GPGGA message.
'''
import serial
import threading
import time
# Assigning COM ports
BASE_PORT = "COM6" # PX1125R (Base station, sending RTCM)
ROVER_PORT = "COM10" # PX1172RDP (Rover, receiving RTCM + outputting NMEA)
BAUD_RATE = 115200
STATUS_INTERVAL = 5 # Print RTK status every 5 seconds
rover = None # Shared COM10 connection
def open_serial(port):
"""Attempts to open a serial port."""
try:
ser = serial.Serial(port, BAUD_RATE, timeout=1)
print(f"Successfully opened {port}.")
return ser
except serial.SerialException as e:
print(f"Error opening {port}: {e}")
return None
def forward_rtcm():
"""Reads RTCM data from Base and forwards it to Rover."""
global rover
rover = open_serial(ROVER_PORT) # Open COM10 first
base = open_serial(BASE_PORT) # Open COM6
if not base or not rover:
print("Failed to open one or both ports. Exiting RTCM forwarding.")
return
print(f"Forwarding RTCM data from {BASE_PORT} to {ROVER_PORT}...")
try:
while True:
data = base.read(4096) # Read RTCM data in chunks
if data:
rover.write(data) # Send RTCM to Rover
time.sleep(0.01) # Short delay to prevent overloading buffer
except Exception as e:
print(f"Error in RTCM forwarding: {e}")
finally:
base.close()
rover.close()
def monitor_nmea():
"""Reads NMEA output from Rover and prints RTK status every 5 seconds with last GGA message."""
global rover
print("Waiting for Rover (COM10) to be ready...")
while rover is None: # Wait until `forward_rtcm` opens COM10
time.sleep(1)
print(f"Listening to NMEA data from {ROVER_PORT}...")
last_status_time = time.time()
last_gga_message = None # Store last GGA message
try:
while True:
line = rover.readline().decode(errors='ignore').strip()
if line.startswith("$GPGGA"): # Extract RTK status every 5 seconds
last_gga_message = line # Store the last GGA message
current_time = time.time()
if current_time - last_status_time >= STATUS_INTERVAL:
fields = line.split(',')
fix_type = fields[6]
num_satellites = fields[7]
altitude = fields[9]
status = {
"0": "No Fix",
"1": "GPS Fix",
"2": "DGPS Fix",
"4": "RTK Fixed",
"5": "RTK Float"
}.get(fix_type, "Unknown")
print(f"[{time.strftime('%H:%M:%S')}] RTK Status: {status} | Satellites: {num_satellites} | Altitude: {altitude}m")
print(f"Last GGA Message: {last_gga_message}") # Print full GGA message
last_status_time = current_time # Reset timer
except Exception as e:
print(f"Error in NMEA monitoring: {e}")
# Run both functions in parallel
rtcm_thread = threading.Thread(target=forward_rtcm, daemon=True)
nmea_thread = threading.Thread(target=monitor_nmea, daemon=True)
rtcm_thread.start()
nmea_thread.start()
# Keep script running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\nExiting...")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment