Skip to content

Instantly share code, notes, and snippets.

@vo
Last active May 22, 2020 09:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save vo/9c1a790cfd413512b153f928b24bb081 to your computer and use it in GitHub Desktop.
Save vo/9c1a790cfd413512b153f928b24bb081 to your computer and use it in GitHub Desktop.
mavlink packet inspector
from pymavlink import mavutil
import threading
connection = mavutil.mavlink_connection("udp:0.0.0.0:14592")
local = mavutil.mavlink_connection("udpout:127.0.0.1:14593")
def on_msg_from_uas(msg):
buf = msg.get_msgbuf()
local.write(buf)
def on_msg_from_gcs(msg):
buf = msg.get_msgbuf()
connection.write(buf)
def uas_loop():
while True:
try:
msg = connection.recv_msg()
if msg:
msg_type = msg.get_type()
if msg_type.startswith("MISSION_REQUEST"):
print "uas: MISSION_REQUEST seq:", msg.seq
elif msg_type.startswith("MISSION_ITEM"):
print "uas: MISSION_ITEM", msg.seq
elif msg_type.startswith("MISSION") and msg_type != "MISSION_CURRENT":
print "uas:", msg_type
if msg_type.startswith("STATUSTEXT"):
print "uas:", msg.text
on_msg_from_uas(msg)
except Exception as e:
pass
def gcs_loop():
while True:
try:
msg = local.recv_msg()
if msg:
msg_type = msg.get_type()
if msg_type.startswith("MISSION"):
if msg_type == "MISSION_ITEM":
print "gcs:", msg_type, "seq:", msg.seq, "cmd:", msg.command
elif msg_type == "MISSION_COUNT":
print "gcs:", msg_type, "count:", msg.count
else:
print "gcs:", msg_type
if msg_type.startswith("STATUSTEXT"):
print "gcs:", msg.text
on_msg_from_gcs(msg)
except Exception as e:
pass
uas_thread = threading.Thread(target=uas_loop)
gcs_thread = threading.Thread(target=gcs_loop)
uas_thread.start()
gcs_thread.start()
uas_thread.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment