Skip to content

Instantly share code, notes, and snippets.

@ES-Alexander
Created June 21, 2022 07:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ES-Alexander/d679252763b0b94f23e17a8a17823b5a to your computer and use it in GitHub Desktop.
Save ES-Alexander/d679252763b0b94f23e17a8a17823b5a to your computer and use it in GitHub Desktop.
Minimal example for course tracking of a GPS-enabled MAVLink vehicle, using a low-pass IIR filter for averaging values over time.
#!/usr/bin/env python3
from pymavlink import mavutil, mavlink
class CourseAverager:
MSG_TYPE = 'GLOBAL_POSITION_INT'
def __init__(self, mavlink_connection, request_rate=1, decay_rate=0.95, initial_course=(0, 0)):
self.master = mavlink_connection
self.request_message(request_rate)
self.decay_rate = decay_rate
self._remainder = 1 - decay_rate
self.average_course = list(initial_values)
def request_message(self, request_rate):
if not request_rate:
return
# convert Hz to microsecond period
interval = int(1e6 / request_rate)
# send request
self.master.command_long_send(
self.master.target_system, self.master.target_component,
mavlink.MAV_CMD_SET_MESSAGE_INTERVAL,
0, # confirmation
getattr(mavlink, f'MAVLINK_MSG_ID_{self.MSG_TYPE}'), interval,
0, 0, 0, 0, 0) # *unused, response_target
# TODO: ensure command was received and accepted (per Command Protocol)
# Generally not essential, but doing so makes the program more robust
# (e.g. in case the message was lost on the way)
def update_course(self, message):
for index, axis in enumerate('xy'):
new_velocity = getattr(message, f'v{axis}')
old_average = self.average_course[index]
# Infinite impulse response (IIR) low-pass (averaging) filter
# -> every past sample matters, but recent ones matter more than old ones
# -> minimal memory and calculation overhead
self.average_course[index] = (
old_average * self.decay_rate
+ new_velocity * self._remainder
)
if __name__ == '__main__':
from argparse import ArgumentParser, ArgumentDefaultsHelpFormatter
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument('-p', '--port', default='14660',
help='MAVLink connection specifier (UDP client port)')
parser.add_argument('-r', '--request_rate', default=1, type=float,
help='GPS message request rate in Hertz. Lower -> less CPU usage but more missing data.')
parser.add_argument('--decay_rate', default=0.95, type=float,
help='Proportional decay at each timestep. Smaller -> earlier samples matter less.')
args = parser.parse_args()
master = mavutil.mavlink_connection(f'udpin:0.0.0.0:{args.port}')
master.wait_heartbeat()
course_averager = CourseAverager(master, args.request_rate, args.decay_rate)
output_schedule = mavutil.periodic_event(1/5) # trigger once every 5 seconds
output_schedule.force() # make sure it triggers once at the start
# TODO: create (and use) schedule for sending HEARTBEAT messages, like in
# https://discuss.bluerobotics.com/t/heartbeat/10940/6
# Likely not essential if the only connection to an endpoint, but the MAVLink
# protocol technically requires every component to send a regular heartbeat,
# and some proxy managers may kick silent connections to reduce CPU usage.
while "program running":
if output_schedule.trigger():
print('Course:', course_averager.average_course)
msg = master.recv_match(type=course_averager.MSG_TYPE, blocking=True)
course_averager.update_course(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment