Last active
May 2, 2022 09:37
-
-
Save alehed/ab70682d44fe4149a974a79d7f16d802 to your computer and use it in GitHub Desktop.
mavlink_python output comparison (shows changes in whitespace and quotes)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
MAVLink protocol implementation (auto-generated by mavgen.py) | |
Generated from: common.xml | |
Note: this file has been auto-generated. DO NOT EDIT | |
""" | |
from __future__ import print_function | |
import array | |
import hashlib | |
import json | |
import os | |
import platform | |
import struct | |
import sys | |
import time | |
from builtins import object, range | |
from pymavlink.generator.mavcrc import x25crc | |
WIRE_PROTOCOL_VERSION = "2.0" | |
DIALECT = "after" | |
PROTOCOL_MARKER_V1 = 0xFE | |
PROTOCOL_MARKER_V2 = 0xFD | |
HEADER_LEN_V1 = 6 | |
HEADER_LEN_V2 = 10 | |
MAVLINK_SIGNATURE_BLOCK_LEN = 13 | |
MAVLINK_IFLAG_SIGNED = 0x01 | |
# Not yet supported on other dialects | |
native_supported = platform.system() != "Windows" | |
# Will force use of native code regardless of what client app wants | |
native_force = "MAVNATIVE_FORCE" in os.environ | |
# Will force both native and legacy code to be used and their results compared | |
native_testing = "MAVNATIVE_TESTING" in os.environ | |
if native_supported and float(WIRE_PROTOCOL_VERSION) <= 1: | |
try: | |
import mavnative | |
except ImportError: | |
print("ERROR LOADING MAVNATIVE - falling back to python implementation") | |
native_supported = False | |
else: | |
# mavnative isn't supported for MAVLink2 yet | |
native_supported = False | |
# allow MAV_IGNORE_CRC=1 to ignore CRC, allowing some | |
# corrupted msgs to be seen | |
MAVLINK_IGNORE_CRC = os.environ.get("MAV_IGNORE_CRC", 0) | |
# some base types from mavlink_types.h | |
MAVLINK_TYPE_CHAR = 0 | |
MAVLINK_TYPE_UINT8_T = 1 | |
MAVLINK_TYPE_INT8_T = 2 | |
MAVLINK_TYPE_UINT16_T = 3 | |
MAVLINK_TYPE_INT16_T = 4 | |
MAVLINK_TYPE_UINT32_T = 5 | |
MAVLINK_TYPE_INT32_T = 6 | |
MAVLINK_TYPE_UINT64_T = 7 | |
MAVLINK_TYPE_INT64_T = 8 | |
MAVLINK_TYPE_FLOAT = 9 | |
MAVLINK_TYPE_DOUBLE = 10 | |
# swiped from DFReader.py | |
def to_string(s): | |
"""desperate attempt to convert a string regardless of what garbage we get""" | |
try: | |
return s.decode("utf-8") | |
except Exception as e: | |
pass | |
try: | |
s2 = s.encode("utf-8", "ignore") | |
x = u"%s" % s2 | |
return s2 | |
except Exception: | |
pass | |
# so it's a nasty one. Let's grab as many characters as we can | |
r = "" | |
try: | |
for c in s: | |
r2 = r + c | |
r2 = r2.encode("ascii", "ignore") | |
x = u"%s" % r2 | |
r = r2 | |
except Exception: | |
pass | |
return r + "_XXX" | |
class MAVLink_header(object): | |
"""MAVLink message header""" | |
def __init__(self, msgId, incompat_flags=0, compat_flags=0, mlen=0, seq=0, srcSystem=0, srcComponent=0): | |
self.mlen = mlen | |
self.seq = seq | |
self.srcSystem = srcSystem | |
self.srcComponent = srcComponent | |
self.msgId = msgId | |
self.incompat_flags = incompat_flags | |
self.compat_flags = compat_flags | |
def pack(self, force_mavlink1=False): | |
if WIRE_PROTOCOL_VERSION == "2.0" and not force_mavlink1: | |
return struct.pack( | |
"<BBBBBBBHB", | |
253, | |
self.mlen, | |
self.incompat_flags, | |
self.compat_flags, | |
self.seq, | |
self.srcSystem, | |
self.srcComponent, | |
self.msgId & 0xFFFF, | |
self.msgId >> 16, | |
) | |
return struct.pack( | |
"<BBBBBB", | |
PROTOCOL_MARKER_V1, | |
self.mlen, | |
self.seq, | |
self.srcSystem, | |
self.srcComponent, | |
self.msgId, | |
) | |
class MAVLink_message(object): | |
"""base MAVLink message class""" | |
def __init__(self, msgId, name): | |
self._header = MAVLink_header(msgId) | |
self._payload = None | |
self._msgbuf = None | |
self._crc = None | |
self._fieldnames = [] | |
self._type = name | |
self._signed = False | |
self._link_id = None | |
self._instances = None | |
self._instance_field = None | |
def format_attr(self, field): | |
"""override field getter""" | |
raw_attr = getattr(self, field) | |
if isinstance(raw_attr, bytes): | |
raw_attr = to_string(raw_attr).rstrip("\00") | |
return raw_attr | |
def get_msgbuf(self): | |
if isinstance(self._msgbuf, bytearray): | |
return self._msgbuf | |
return bytearray(self._msgbuf) | |
def get_header(self): | |
return self._header | |
def get_payload(self): | |
return self._payload | |
def get_crc(self): | |
return self._crc | |
def get_fieldnames(self): | |
return self._fieldnames | |
def get_type(self): | |
return self._type | |
def get_msgId(self): | |
return self._header.msgId | |
def get_srcSystem(self): | |
return self._header.srcSystem | |
def get_srcComponent(self): | |
return self._header.srcComponent | |
def get_seq(self): | |
return self._header.seq | |
def get_signed(self): | |
return self._signed | |
def get_link_id(self): | |
return self._link_id | |
def __str__(self): | |
ret = "%s {" % self._type | |
for a in self._fieldnames: | |
v = self.format_attr(a) | |
ret += "%s : %s, " % (a, v) | |
ret = ret[0:-2] + "}" | |
return ret | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
def __eq__(self, other): | |
if other is None: | |
return False | |
if self.get_type() != other.get_type(): | |
return False | |
# We do not compare CRC because native code doesn't provide it | |
# if self.get_crc() != other.get_crc(): | |
# return False | |
if self.get_seq() != other.get_seq(): | |
return False | |
if self.get_srcSystem() != other.get_srcSystem(): | |
return False | |
if self.get_srcComponent() != other.get_srcComponent(): | |
return False | |
for a in self._fieldnames: | |
if self.format_attr(a) != other.format_attr(a): | |
return False | |
return True | |
def to_dict(self): | |
d = dict({}) | |
d["mavpackettype"] = self._type | |
for a in self._fieldnames: | |
d[a] = self.format_attr(a) | |
return d | |
def to_json(self): | |
return json.dumps(self.to_dict()) | |
def sign_packet(self, mav): | |
h = hashlib.new("sha256") | |
self._msgbuf += struct.pack("<BQ", mav.signing.link_id, mav.signing.timestamp)[:7] | |
h.update(mav.signing.secret_key) | |
h.update(self._msgbuf) | |
sig = h.digest()[:6] | |
self._msgbuf += sig | |
mav.signing.timestamp += 1 | |
def pack(self, mav, crc_extra, payload, force_mavlink1=False): | |
plen = len(payload) | |
if WIRE_PROTOCOL_VERSION != "1.0" and not force_mavlink1: | |
# in MAVLink2 we can strip trailing zeros off payloads. This allows for simple | |
# variable length arrays and smaller packets | |
nullbyte = chr(0) | |
# in Python2, type("fred") is str but also type("fred")==bytes | |
if str(type(payload)) == "<class 'bytes'>": | |
nullbyte = 0 | |
while plen > 1 and payload[plen - 1] == nullbyte: | |
plen -= 1 | |
self._payload = payload[:plen] | |
incompat_flags = 0 | |
if mav.signing.sign_outgoing: | |
incompat_flags |= MAVLINK_IFLAG_SIGNED | |
self._header = MAVLink_header( | |
self._header.msgId, | |
incompat_flags=incompat_flags, | |
compat_flags=0, | |
mlen=len(self._payload), | |
seq=mav.seq, | |
srcSystem=mav.srcSystem, | |
srcComponent=mav.srcComponent, | |
) | |
self._msgbuf = self._header.pack(force_mavlink1=force_mavlink1) + self._payload | |
crc = x25crc(self._msgbuf[1:]) | |
if True: # using CRC extra | |
crc.accumulate_str(struct.pack("B", crc_extra)) | |
self._crc = crc.crc | |
self._msgbuf += struct.pack("<H", self._crc) | |
if mav.signing.sign_outgoing and not force_mavlink1: | |
self.sign_packet(mav) | |
return self._msgbuf | |
def __getitem__(self, key): | |
"""support indexing, allowing for multi-instance sensors in one message""" | |
if self._instances is None: | |
raise IndexError() | |
if not key in self._instances: | |
raise IndexError() | |
return self._instances[key] | |
# enums | |
class EnumEntry(object): | |
def __init__(self, name, description): | |
self.name = name | |
self.description = description | |
self.param = {} | |
enums = {} | |
# MAV_AUTOPILOT | |
enums["MAV_AUTOPILOT"] = {} | |
MAV_AUTOPILOT_GENERIC = 0 # Generic autopilot, full support for everything | |
enums["MAV_AUTOPILOT"][0] = EnumEntry("MAV_AUTOPILOT_GENERIC", """Generic autopilot, full support for everything""") | |
MAV_AUTOPILOT_RESERVED = 1 # Reserved for future use. | |
enums["MAV_AUTOPILOT"][1] = EnumEntry("MAV_AUTOPILOT_RESERVED", """Reserved for future use.""") | |
MAV_AUTOPILOT_SLUGS = 2 # SLUGS autopilot, http://slugsuav.soe.ucsc.edu | |
enums["MAV_AUTOPILOT"][2] = EnumEntry("MAV_AUTOPILOT_SLUGS", """SLUGS autopilot, http://slugsuav.soe.ucsc.edu""") | |
MAV_AUTOPILOT_AEROB = 16 # Aerob -- http://aerob.ru | |
enums["MAV_AUTOPILOT"][16] = EnumEntry("MAV_AUTOPILOT_AEROB", """Aerob -- http://aerob.ru""") | |
MAV_AUTOPILOT_ASLUAV = 17 # ASLUAV autopilot -- http://www.asl.ethz.ch | |
enums["MAV_AUTOPILOT"][17] = EnumEntry("MAV_AUTOPILOT_ASLUAV", """ASLUAV autopilot -- http://www.asl.ethz.ch""") | |
MAV_AUTOPILOT_ENUM_END = 18 | |
enums["MAV_AUTOPILOT"][18] = EnumEntry("MAV_AUTOPILOT_ENUM_END", """""") | |
# MAV_TYPE | |
enums["MAV_TYPE"] = {} | |
MAV_TYPE_GENERIC = 0 # Generic micro air vehicle. | |
enums["MAV_TYPE"][0] = EnumEntry("MAV_TYPE_GENERIC", """Generic micro air vehicle.""") | |
MAV_TYPE_FIXED_WING = 1 # Fixed wing aircraft. | |
enums["MAV_TYPE"][1] = EnumEntry("MAV_TYPE_FIXED_WING", """Fixed wing aircraft.""") | |
MAV_TYPE_QUADROTOR = 2 # Quadrotor | |
enums["MAV_TYPE"][2] = EnumEntry("MAV_TYPE_QUADROTOR", """Quadrotor""") | |
MAV_TYPE_COAXIAL = 3 # Coaxial helicopter | |
enums["MAV_TYPE"][3] = EnumEntry("MAV_TYPE_COAXIAL", """Coaxial helicopter""") | |
MAV_TYPE_HELICOPTER = 4 # Normal helicopter with tail rotor. | |
enums["MAV_TYPE"][4] = EnumEntry("MAV_TYPE_HELICOPTER", """Normal helicopter with tail rotor.""") | |
MAV_TYPE_VTOL_TILTROTOR = 21 # Tiltrotor VTOL | |
enums["MAV_TYPE"][21] = EnumEntry("MAV_TYPE_VTOL_TILTROTOR", """Tiltrotor VTOL""") | |
MAV_TYPE_VTOL_RESERVED2 = 22 # VTOL reserved 2 | |
enums["MAV_TYPE"][22] = EnumEntry("MAV_TYPE_VTOL_RESERVED2", """VTOL reserved 2""") | |
MAV_TYPE_VTOL_RESERVED3 = 23 # VTOL reserved 3 | |
enums["MAV_TYPE"][23] = EnumEntry("MAV_TYPE_VTOL_RESERVED3", """VTOL reserved 3""") | |
MAV_TYPE_VTOL_RESERVED4 = 24 # VTOL reserved 4 | |
enums["MAV_TYPE"][24] = EnumEntry("MAV_TYPE_VTOL_RESERVED4", """VTOL reserved 4""") | |
MAV_TYPE_ENUM_END = 25 | |
enums["MAV_TYPE"][25] = EnumEntry("MAV_TYPE_ENUM_END", """""") | |
# FIRMWARE_VERSION_TYPE | |
enums["FIRMWARE_VERSION_TYPE"] = {} | |
FIRMWARE_VERSION_TYPE_DEV = 0 # development release | |
enums["FIRMWARE_VERSION_TYPE"][0] = EnumEntry("FIRMWARE_VERSION_TYPE_DEV", """development release""") | |
FIRMWARE_VERSION_TYPE_ALPHA = 64 # alpha release | |
enums["FIRMWARE_VERSION_TYPE"][64] = EnumEntry("FIRMWARE_VERSION_TYPE_ALPHA", """alpha release""") | |
FIRMWARE_VERSION_TYPE_BETA = 128 # beta release | |
enums["FIRMWARE_VERSION_TYPE"][128] = EnumEntry("FIRMWARE_VERSION_TYPE_BETA", """beta release""") | |
FIRMWARE_VERSION_TYPE_RC = 192 # release candidate | |
enums["FIRMWARE_VERSION_TYPE"][192] = EnumEntry("FIRMWARE_VERSION_TYPE_RC", """release candidate""") | |
FIRMWARE_VERSION_TYPE_OFFICIAL = 255 # official stable release | |
enums["FIRMWARE_VERSION_TYPE"][255] = EnumEntry("FIRMWARE_VERSION_TYPE_OFFICIAL", """official stable release""") | |
FIRMWARE_VERSION_TYPE_ENUM_END = 256 | |
enums["FIRMWARE_VERSION_TYPE"][256] = EnumEntry("FIRMWARE_VERSION_TYPE_ENUM_END", """""") | |
# MAV_CMD | |
enums["MAV_CMD"] = {} | |
MAV_CMD_NAV_WAYPOINT = 16 # Navigate to MISSION. | |
enums["MAV_CMD"][16] = EnumEntry("MAV_CMD_NAV_WAYPOINT", """Navigate to MISSION.""") | |
enums["MAV_CMD"][16].param[1] = """Hold time in decimal seconds. (ignored by fixed wing, time to stay at MISSION for rotary wing)""" | |
enums["MAV_CMD"][16].param[2] = """Acceptance radius in meters (if the sphere with this radius is hit, the MISSION counts as reached)""" | |
enums["MAV_CMD"][16].param[3] = """0 to pass through the WP, if > 0 radius in meters to pass by WP. Positive value for clockwise orbit, negative value for counter-clockwise orbit. Allows trajectory control.""" | |
enums["MAV_CMD"][16].param[4] = """Desired yaw angle at MISSION (rotary wing)""" | |
enums["MAV_CMD"][16].param[5] = """Latitude""" | |
enums["MAV_CMD"][16].param[6] = """Longitude""" | |
enums["MAV_CMD"][16].param[7] = """Altitude""" | |
MAV_CMD_NAV_GUIDED_ENABLE = 92 # hand control over to an external controller | |
enums["MAV_CMD"][92] = EnumEntry("MAV_CMD_NAV_GUIDED_ENABLE", """hand control over to an external controller""") | |
enums["MAV_CMD"][92].param[1] = """On / Off (> 0.5f on)""" | |
enums["MAV_CMD"][92].param[2] = """Empty""" | |
enums["MAV_CMD"][92].param[3] = """Empty""" | |
enums["MAV_CMD"][92].param[4] = """Empty""" | |
enums["MAV_CMD"][92].param[5] = """Empty""" | |
enums["MAV_CMD"][92].param[6] = """Empty""" | |
enums["MAV_CMD"][92].param[7] = """Empty""" | |
MAV_CMD_DO_GO_AROUND = 191 # Mission command to safely abort an autonmous landing. | |
enums["MAV_CMD"][191] = EnumEntry("MAV_CMD_DO_GO_AROUND", """Mission command to safely abort an autonmous landing.""") | |
enums["MAV_CMD"][191].param[1] = """Altitude (meters)""" | |
enums["MAV_CMD"][191].param[2] = """Empty""" | |
enums["MAV_CMD"][191].param[3] = """Empty""" | |
enums["MAV_CMD"][191].param[4] = """Empty""" | |
enums["MAV_CMD"][191].param[5] = """Empty""" | |
enums["MAV_CMD"][191].param[6] = """Empty""" | |
enums["MAV_CMD"][191].param[7] = """Empty""" | |
MAV_CMD_ENUM_END = 192 | |
enums["MAV_CMD"][192] = EnumEntry("MAV_CMD_ENUM_END", """""") | |
# message IDs | |
MAVLINK_MSG_ID_BAD_DATA = -1 | |
MAVLINK_MSG_ID_UNKNOWN = -2 | |
MAVLINK_MSG_ID_HEARTBEAT = 0 | |
MAVLINK_MSG_ID_SYS_STATUS = 1 | |
MAVLINK_MSG_ID_SYSTEM_TIME = 2 | |
MAVLINK_MSG_ID_PING = 4 | |
class MAVLink_heartbeat_message(MAVLink_message): | |
""" | |
The heartbeat message shows that a system is present and | |
responding. The type of the MAV and Autopilot hardware allow the | |
receiving system to treat further messages from this system | |
appropriate (e.g. by laying out the user interface based on the | |
autopilot). | |
""" | |
id = MAVLINK_MSG_ID_HEARTBEAT | |
name = "HEARTBEAT" | |
fieldnames = ["type", "autopilot", "base_mode", "custom_mode", "system_status", "mavlink_version"] | |
ordered_fieldnames = ["custom_mode", "type", "autopilot", "base_mode", "system_status", "mavlink_version"] | |
fieldtypes = ["uint8_t", "uint8_t", "uint8_t", "uint32_t", "uint8_t", "uint8_t"] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = "<IBBBBB" | |
native_format = bytearray("<IBBBBB", "ascii") | |
orders = [1, 2, 3, 0, 4, 5] | |
lengths = [1, 1, 1, 1, 1, 1] | |
array_lengths = [0, 0, 0, 0, 0, 0] | |
crc_extra = 50 | |
unpacker = struct.Struct("<IBBBBB") | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version): | |
MAVLink_message.__init__(self, MAVLink_heartbeat_message.id, MAVLink_heartbeat_message.name) | |
self._fieldnames = MAVLink_heartbeat_message.fieldnames | |
self._instance_field = MAVLink_heartbeat_message.instance_field | |
self._instance_offset = MAVLink_heartbeat_message.instance_offset | |
self.type = type | |
self.autopilot = autopilot | |
self.base_mode = base_mode | |
self.custom_mode = custom_mode | |
self.system_status = system_status | |
self.mavlink_version = mavlink_version | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 50, struct.pack("<IBBBBB", self.custom_mode, self.type, self.autopilot, self.base_mode, self.system_status, self.mavlink_version), force_mavlink1=force_mavlink1) | |
class MAVLink_sys_status_message(MAVLink_message): | |
""" | |
The general system state. If the system is following the MAVLink | |
standard, the system state is mainly defined by three orthogonal | |
states/modes: The system mode, which is either LOCKED (motors shut | |
down and locked), MANUAL (system under RC control), GUIDED (system | |
with autonomous position control, position setpoint controlled | |
manually) or AUTO (system guided by path/waypoint planner). The | |
NAV_MODE defined the current flight state: LIFTOFF (often an open- | |
loop maneuver), LANDING, WAYPOINTS or VECTOR. This represents the | |
internal navigation state machine. The system status shows wether | |
the system is currently active or not and if an emergency occured. | |
During the CRITICAL and EMERGENCY states the MAV is still | |
considered to be active, but should start emergency procedures | |
autonomously. After a failure occured it should first move from | |
active to critical to allow manual intervention and then move to | |
emergency after a certain timeout. | |
""" | |
id = MAVLINK_MSG_ID_SYS_STATUS | |
name = "SYS_STATUS" | |
fieldnames = ["onboard_control_sensors_present", "onboard_control_sensors_enabled", "onboard_control_sensors_health", "load", "voltage_battery", "current_battery", "battery_remaining", "drop_rate_comm", "errors_comm", "errors_count1", "errors_count2", "errors_count3", "errors_count4"] | |
ordered_fieldnames = ["onboard_control_sensors_present", "onboard_control_sensors_enabled", "onboard_control_sensors_health", "load", "voltage_battery", "current_battery", "drop_rate_comm", "errors_comm", "errors_count1", "errors_count2", "errors_count3", "errors_count4", "battery_remaining"] | |
fieldtypes = ["uint32_t", "uint32_t", "uint32_t", "uint16_t", "uint16_t", "int16_t", "int8_t", "uint16_t", "uint16_t", "uint16_t", "uint16_t", "uint16_t", "uint16_t"] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = "<IIIHHhHHHHHHb" | |
native_format = bytearray("<IIIHHhHHHHHHb", "ascii") | |
orders = [0, 1, 2, 3, 4, 5, 12, 6, 7, 8, 9, 10, 11] | |
lengths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] | |
array_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] | |
crc_extra = 124 | |
unpacker = struct.Struct("<IIIHHhHHHHHHb") | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4): | |
MAVLink_message.__init__(self, MAVLink_sys_status_message.id, MAVLink_sys_status_message.name) | |
self._fieldnames = MAVLink_sys_status_message.fieldnames | |
self._instance_field = MAVLink_sys_status_message.instance_field | |
self._instance_offset = MAVLink_sys_status_message.instance_offset | |
self.onboard_control_sensors_present = onboard_control_sensors_present | |
self.onboard_control_sensors_enabled = onboard_control_sensors_enabled | |
self.onboard_control_sensors_health = onboard_control_sensors_health | |
self.load = load | |
self.voltage_battery = voltage_battery | |
self.current_battery = current_battery | |
self.battery_remaining = battery_remaining | |
self.drop_rate_comm = drop_rate_comm | |
self.errors_comm = errors_comm | |
self.errors_count1 = errors_count1 | |
self.errors_count2 = errors_count2 | |
self.errors_count3 = errors_count3 | |
self.errors_count4 = errors_count4 | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 124, struct.pack("<IIIHHhHHHHHHb", self.onboard_control_sensors_present, self.onboard_control_sensors_enabled, self.onboard_control_sensors_health, self.load, self.voltage_battery, self.current_battery, self.drop_rate_comm, self.errors_comm, self.errors_count1, self.errors_count2, self.errors_count3, self.errors_count4, self.battery_remaining), force_mavlink1=force_mavlink1) | |
class MAVLink_system_time_message(MAVLink_message): | |
""" | |
The system time is the time of the master clock, typically the | |
computer clock of the main onboard computer. | |
""" | |
id = MAVLINK_MSG_ID_SYSTEM_TIME | |
name = "SYSTEM_TIME" | |
fieldnames = ["time_unix_usec", "time_boot_ms"] | |
ordered_fieldnames = ["time_unix_usec", "time_boot_ms"] | |
fieldtypes = ["uint64_t", "uint32_t"] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = "<QI" | |
native_format = bytearray("<QI", "ascii") | |
orders = [0, 1] | |
lengths = [1, 1] | |
array_lengths = [0, 0] | |
crc_extra = 137 | |
unpacker = struct.Struct("<QI") | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, time_unix_usec, time_boot_ms): | |
MAVLink_message.__init__(self, MAVLink_system_time_message.id, MAVLink_system_time_message.name) | |
self._fieldnames = MAVLink_system_time_message.fieldnames | |
self._instance_field = MAVLink_system_time_message.instance_field | |
self._instance_offset = MAVLink_system_time_message.instance_offset | |
self.time_unix_usec = time_unix_usec | |
self.time_boot_ms = time_boot_ms | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 137, struct.pack("<QI", self.time_unix_usec, self.time_boot_ms), force_mavlink1=force_mavlink1) | |
class MAVLink_ping_message(MAVLink_message): | |
""" | |
A ping message either requesting or responding to a ping. This | |
allows to measure the system latencies, including serial port, | |
radio modem and UDP connections. | |
""" | |
id = MAVLINK_MSG_ID_PING | |
name = "PING" | |
fieldnames = ["time_usec", "seq", "target_system", "target_component"] | |
ordered_fieldnames = ["time_usec", "seq", "target_system", "target_component"] | |
fieldtypes = ["uint64_t", "uint32_t", "uint8_t", "uint8_t"] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = "<QIBB" | |
native_format = bytearray("<QIBB", "ascii") | |
orders = [0, 1, 2, 3] | |
lengths = [1, 1, 1, 1] | |
array_lengths = [0, 0, 0, 0] | |
crc_extra = 237 | |
unpacker = struct.Struct("<QIBB") | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, time_usec, seq, target_system, target_component): | |
MAVLink_message.__init__(self, MAVLink_ping_message.id, MAVLink_ping_message.name) | |
self._fieldnames = MAVLink_ping_message.fieldnames | |
self._instance_field = MAVLink_ping_message.instance_field | |
self._instance_offset = MAVLink_ping_message.instance_offset | |
self.time_usec = time_usec | |
self.seq = seq | |
self.target_system = target_system | |
self.target_component = target_component | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 237, struct.pack("<QIBB", self.time_usec, self.seq, self.target_system, self.target_component), force_mavlink1=force_mavlink1) | |
mavlink_map = { | |
MAVLINK_MSG_ID_HEARTBEAT: MAVLink_heartbeat_message, | |
MAVLINK_MSG_ID_SYS_STATUS: MAVLink_sys_status_message, | |
MAVLINK_MSG_ID_SYSTEM_TIME: MAVLink_system_time_message, | |
MAVLINK_MSG_ID_PING: MAVLink_ping_message, | |
} | |
class MAVError(Exception): | |
"""MAVLink error class""" | |
def __init__(self, msg): | |
Exception.__init__(self, msg) | |
self.message = msg | |
class MAVString(str): | |
"""NUL terminated string""" | |
def __init__(self, s): | |
str.__init__(self) | |
def __str__(self): | |
i = self.find(chr(0)) | |
if i == -1: | |
return self[:] | |
return self[0:i] | |
class MAVLink_bad_data(MAVLink_message): | |
""" | |
a piece of bad data in a mavlink stream | |
""" | |
def __init__(self, data, reason): | |
MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, "BAD_DATA") | |
self._fieldnames = ["data", "reason"] | |
self.data = data | |
self.reason = reason | |
self._msgbuf = data | |
self._instance_field = None | |
def __str__(self): | |
"""Override the __str__ function from MAVLink_messages because non-printable characters are common in to be the reason for this message to exist.""" | |
return "%s {%s, data:%s}" % (self._type, self.reason, [("%x" % ord(i) if isinstance(i, str) else "%x" % i) for i in self.data]) | |
class MAVLink_unknown(MAVLink_message): | |
""" | |
a message that we don't have in the XML used when built | |
""" | |
def __init__(self, msgid, data): | |
MAVLink_message.__init__(self, MAVLINK_MSG_ID_UNKNOWN, "UNKNOWN_%u" % msgid) | |
self._fieldnames = ["data"] | |
self.data = data | |
self._msgbuf = data | |
self._instance_field = None | |
def __str__(self): | |
"""Override the __str__ function from MAVLink_messages because non-printable characters are common.""" | |
return "%s {data:%s}" % (self._type, [("%x" % ord(i) if isinstance(i, str) else "%x" % i) for i in self.data]) | |
class MAVLinkSigning(object): | |
"""MAVLink signing state class""" | |
def __init__(self): | |
self.secret_key = None | |
self.timestamp = 0 | |
self.link_id = 0 | |
self.sign_outgoing = False | |
self.allow_unsigned_callback = None | |
self.stream_timestamps = {} | |
self.sig_count = 0 | |
self.badsig_count = 0 | |
self.goodsig_count = 0 | |
self.unsigned_count = 0 | |
self.reject_count = 0 | |
class MAVLink(object): | |
"""MAVLink protocol handling class""" | |
def __init__(self, file, srcSystem=0, srcComponent=0, use_native=False): | |
self.seq = 0 | |
self.file = file | |
self.srcSystem = srcSystem | |
self.srcComponent = srcComponent | |
self.callback = None | |
self.callback_args = None | |
self.callback_kwargs = None | |
self.send_callback = None | |
self.send_callback_args = None | |
self.send_callback_kwargs = None | |
self.buf = bytearray() | |
self.buf_index = 0 | |
self.expected_length = HEADER_LEN_V1 + 2 | |
self.have_prefix_error = False | |
self.robust_parsing = False | |
self.protocol_marker = 253 | |
self.little_endian = True | |
self.crc_extra = True | |
self.sort_fields = True | |
self.total_packets_sent = 0 | |
self.total_bytes_sent = 0 | |
self.total_packets_received = 0 | |
self.total_bytes_received = 0 | |
self.total_receive_errors = 0 | |
self.startup_time = time.time() | |
self.signing = MAVLinkSigning() | |
if native_supported and (use_native or native_testing or native_force): | |
print("NOTE: mavnative is currently beta-test code") | |
self.native = mavnative.NativeConnection(MAVLink_message, mavlink_map) | |
else: | |
self.native = None | |
if native_testing: | |
self.test_buf = bytearray() | |
self.mav20_unpacker = struct.Struct("<cBBBBBBHB") | |
self.mav10_unpacker = struct.Struct("<cBBBBB") | |
self.mav20_h3_unpacker = struct.Struct("BBB") | |
self.mav_csum_unpacker = struct.Struct("<H") | |
self.mav_sign_unpacker = struct.Struct("<IH") | |
def set_callback(self, callback, *args, **kwargs): | |
self.callback = callback | |
self.callback_args = args | |
self.callback_kwargs = kwargs | |
def set_send_callback(self, callback, *args, **kwargs): | |
self.send_callback = callback | |
self.send_callback_args = args | |
self.send_callback_kwargs = kwargs | |
def send(self, mavmsg, force_mavlink1=False): | |
"""send a MAVLink message""" | |
buf = mavmsg.pack(self, force_mavlink1=force_mavlink1) | |
self.file.write(buf) | |
self.seq = (self.seq + 1) % 256 | |
self.total_packets_sent += 1 | |
self.total_bytes_sent += len(buf) | |
if self.send_callback: | |
self.send_callback(mavmsg, *self.send_callback_args, **self.send_callback_kwargs) | |
def buf_len(self): | |
return len(self.buf) - self.buf_index | |
def bytes_needed(self): | |
"""return number of bytes needed for next parsing stage""" | |
if self.native: | |
ret = self.native.expected_length - self.buf_len() | |
else: | |
ret = self.expected_length - self.buf_len() | |
if ret <= 0: | |
return 1 | |
return ret | |
def __parse_char_native(self, c): | |
"""this method exists only to see in profiling results""" | |
m = self.native.parse_chars(c) | |
return m | |
def __callbacks(self, msg): | |
"""this method exists only to make profiling results easier to read""" | |
if self.callback: | |
self.callback(msg, *self.callback_args, **self.callback_kwargs) | |
def parse_char(self, c): | |
"""input some data bytes, possibly returning a new message""" | |
self.buf.extend(c) | |
self.total_bytes_received += len(c) | |
if self.native: | |
if native_testing: | |
self.test_buf.extend(c) | |
m = self.__parse_char_native(self.test_buf) | |
m2 = self.__parse_char_legacy() | |
if m2 != m: | |
print("Native: %s\nLegacy: %s\n" % (m, m2)) | |
raise Exception("Native vs. Legacy mismatch") | |
else: | |
m = self.__parse_char_native(self.buf) | |
else: | |
m = self.__parse_char_legacy() | |
if m is not None: | |
self.total_packets_received += 1 | |
self.__callbacks(m) | |
else: | |
# XXX The idea here is if we've read something and there's nothing left in | |
# the buffer, reset it to 0 which frees the memory | |
if self.buf_len() == 0 and self.buf_index != 0: | |
self.buf = bytearray() | |
self.buf_index = 0 | |
return m | |
def __parse_char_legacy(self): | |
"""input some data bytes, possibly returning a new message (uses no native code)""" | |
header_len = HEADER_LEN_V1 | |
if self.buf_len() >= 1 and self.buf[self.buf_index] == PROTOCOL_MARKER_V2: | |
header_len = HEADER_LEN_V2 | |
if self.buf_len() >= 1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V2: | |
magic = self.buf[self.buf_index] | |
self.buf_index += 1 | |
if self.robust_parsing: | |
m = MAVLink_bad_data(bytearray([magic]), "Bad prefix") | |
self.expected_length = header_len + 2 | |
self.total_receive_errors += 1 | |
return m | |
if self.have_prefix_error: | |
return None | |
self.have_prefix_error = True | |
self.total_receive_errors += 1 | |
raise MAVError("invalid MAVLink prefix '%s'" % magic) | |
self.have_prefix_error = False | |
if self.buf_len() >= 3: | |
sbuf = self.buf[self.buf_index : 3 + self.buf_index] | |
if sys.version_info.major < 3: | |
sbuf = str(sbuf) | |
(magic, self.expected_length, incompat_flags) = self.mav20_h3_unpacker.unpack(sbuf) | |
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED): | |
self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN | |
self.expected_length += header_len + 2 | |
if self.expected_length >= (header_len + 2) and self.buf_len() >= self.expected_length: | |
mbuf = array.array("B", self.buf[self.buf_index : self.buf_index + self.expected_length]) | |
self.buf_index += self.expected_length | |
self.expected_length = header_len + 2 | |
if self.robust_parsing: | |
try: | |
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0: | |
raise MAVError("invalid incompat_flags 0x%x 0x%x %u" % (incompat_flags, magic, self.expected_length)) | |
m = self.decode(mbuf) | |
except MAVError as reason: | |
m = MAVLink_bad_data(mbuf, reason.message) | |
self.total_receive_errors += 1 | |
else: | |
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0: | |
raise MAVError("invalid incompat_flags 0x%x 0x%x %u" % (incompat_flags, magic, self.expected_length)) | |
m = self.decode(mbuf) | |
return m | |
return None | |
def parse_buffer(self, s): | |
"""input some data bytes, possibly returning a list of new messages""" | |
m = self.parse_char(s) | |
if m is None: | |
return None | |
ret = [m] | |
while True: | |
m = self.parse_char("") | |
if m is None: | |
return ret | |
ret.append(m) | |
return ret | |
def check_signature(self, msgbuf, srcSystem, srcComponent): | |
"""check signature on incoming message""" | |
if isinstance(msgbuf, array.array): | |
try: | |
msgbuf = msgbuf.tostring() | |
except: | |
msgbuf = msgbuf.tobytes() | |
timestamp_buf = msgbuf[-12:-6] | |
link_id = msgbuf[-13] | |
(tlow, thigh) = self.mav_sign_unpacker.unpack(timestamp_buf) | |
timestamp = tlow + (thigh << 32) | |
# see if the timestamp is acceptable | |
stream_key = (link_id, srcSystem, srcComponent) | |
if stream_key in self.signing.stream_timestamps: | |
if timestamp <= self.signing.stream_timestamps[stream_key]: | |
# reject old timestamp | |
# print("old timestamp") | |
return False | |
else: | |
# a new stream has appeared. Accept the timestamp if it is at most | |
# one minute behind our current timestamp | |
if timestamp + 6000 * 1000 < self.signing.timestamp: | |
# print("bad new stream ", timestamp/(100.0 * 1000 * 60 * 60 * 24 * 365), self.signing.timestamp/(100.0 * 1000 * 60 * 60 * 24 * 365)) | |
return False | |
self.signing.stream_timestamps[stream_key] = timestamp | |
# print("new stream") | |
h = hashlib.new("sha256") | |
h.update(self.signing.secret_key) | |
h.update(msgbuf[:-6]) | |
if str(type(msgbuf)) == "<class 'bytes'>" or str(type(msgbuf)) == "<class 'bytearray'>": | |
# Python 3 | |
sig1 = h.digest()[:6] | |
sig2 = msgbuf[-6:] | |
else: | |
sig1 = str(h.digest())[:6] | |
sig2 = str(msgbuf)[-6:] | |
if sig1 != sig2: | |
# print("sig mismatch") | |
return False | |
# the timestamp we next send with is the max of the received timestamp and | |
# our current timestamp | |
self.signing.timestamp = max(self.signing.timestamp, timestamp) | |
return True | |
def decode(self, msgbuf): | |
"""decode a buffer as a MAVLink message""" | |
# decode the header | |
if msgbuf[0] != PROTOCOL_MARKER_V1: | |
headerlen = 10 | |
try: | |
magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = self.mav20_unpacker.unpack(msgbuf[:headerlen]) | |
except struct.error as emsg: | |
raise MAVError("Unable to unpack MAVLink header: %s" % emsg) | |
msgId = msgIdlow | (msgIdhigh << 16) | |
mapkey = msgId | |
else: | |
headerlen = 6 | |
try: | |
magic, mlen, seq, srcSystem, srcComponent, msgId = self.mav10_unpacker.unpack(msgbuf[:headerlen]) | |
incompat_flags = 0 | |
compat_flags = 0 | |
except struct.error as emsg: | |
raise MAVError("Unable to unpack MAVLink header: %s" % emsg) | |
mapkey = msgId | |
if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0: | |
signature_len = MAVLINK_SIGNATURE_BLOCK_LEN | |
else: | |
signature_len = 0 | |
if ord(magic) != PROTOCOL_MARKER_V1 and ord(magic) != PROTOCOL_MARKER_V2: | |
raise MAVError("invalid MAVLink prefix '%s'" % magic) | |
if mlen != len(msgbuf) - (headerlen + 2 + signature_len): | |
raise MAVError("invalid MAVLink message length. Got %u expected %u, msgId=%u headerlen=%u" % (len(msgbuf) - (headerlen + 2 + signature_len), mlen, msgId, headerlen)) | |
if not mapkey in mavlink_map: | |
return MAVLink_unknown(msgId, msgbuf) | |
# decode the payload | |
type = mavlink_map[mapkey] | |
fmt = type.format | |
order_map = type.orders | |
len_map = type.lengths | |
crc_extra = type.crc_extra | |
# decode the checksum | |
try: | |
(crc,) = self.mav_csum_unpacker.unpack(msgbuf[-(2 + signature_len) :][:2]) | |
except struct.error as emsg: | |
raise MAVError("Unable to unpack MAVLink CRC: %s" % emsg) | |
crcbuf = msgbuf[1 : -(2 + signature_len)] | |
if True: | |
# using CRC extra | |
crcbuf.append(crc_extra) | |
crc2 = x25crc(crcbuf) | |
if crc != crc2.crc and not MAVLINK_IGNORE_CRC: | |
raise MAVError("invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x" % (msgId, crc, crc2.crc)) | |
sig_ok = False | |
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN: | |
self.signing.sig_count += 1 | |
if self.signing.secret_key is not None: | |
accept_signature = False | |
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN: | |
sig_ok = self.check_signature(msgbuf, srcSystem, srcComponent) | |
accept_signature = sig_ok | |
if sig_ok: | |
self.signing.goodsig_count += 1 | |
else: | |
self.signing.badsig_count += 1 | |
if not accept_signature and self.signing.allow_unsigned_callback is not None: | |
accept_signature = self.signing.allow_unsigned_callback(self, msgId) | |
if accept_signature: | |
self.signing.unsigned_count += 1 | |
else: | |
self.signing.reject_count += 1 | |
elif self.signing.allow_unsigned_callback is not None: | |
accept_signature = self.signing.allow_unsigned_callback(self, msgId) | |
if accept_signature: | |
self.signing.unsigned_count += 1 | |
else: | |
self.signing.reject_count += 1 | |
if not accept_signature: | |
raise MAVError("Invalid signature") | |
csize = type.unpacker.size | |
mbuf = msgbuf[headerlen : -(2 + signature_len)] | |
if len(mbuf) < csize: | |
# zero pad to give right size | |
mbuf.extend([0] * (csize - len(mbuf))) | |
if len(mbuf) < csize: | |
raise MAVError("Bad message of type %s length %u needs %s" % (type, len(mbuf), csize)) | |
mbuf = mbuf[:csize] | |
try: | |
t = type.unpacker.unpack(mbuf) | |
except struct.error as emsg: | |
raise MAVError("Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s" % (type, fmt, len(mbuf), emsg)) | |
tlist = list(t) | |
# handle sorted fields | |
if True: | |
t = tlist[:] | |
if sum(len_map) == len(len_map): | |
# message has no arrays in it | |
for i in range(0, len(tlist)): | |
tlist[i] = t[order_map[i]] | |
else: | |
# message has some arrays | |
tlist = [] | |
for i in range(0, len(order_map)): | |
order = order_map[i] | |
L = len_map[order] | |
tip = sum(len_map[:order]) | |
field = t[tip] | |
if L == 1 or isinstance(field, str): | |
tlist.append(field) | |
else: | |
tlist.append(t[tip : (tip + L)]) | |
# terminate any strings | |
for i in range(0, len(tlist)): | |
if type.fieldtypes[i] == "char": | |
if sys.version_info.major >= 3: | |
tlist[i] = to_string(tlist[i]) | |
tlist[i] = str(MAVString(tlist[i])) | |
t = tuple(tlist) | |
# construct the message object | |
try: | |
m = type(*t) | |
except Exception as emsg: | |
raise MAVError("Unable to instantiate MAVLink message of type %s : %s" % (type, emsg)) | |
m._signed = sig_ok | |
if m._signed: | |
m._link_id = msgbuf[-13] | |
m._msgbuf = msgbuf | |
m._payload = msgbuf[6 : -(2 + signature_len)] | |
m._crc = crc | |
m._header = MAVLink_header(msgId, incompat_flags, compat_flags, mlen, seq, srcSystem, srcComponent) | |
return m | |
def heartbeat_encode(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3): | |
""" | |
The heartbeat message shows that a system is present and responding. | |
The type of the MAV and Autopilot hardware allow the receiving | |
system to treat further messages from this system appropriate | |
(e.g. by laying out the user interface based on the | |
autopilot). | |
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t) | |
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t) | |
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t) | |
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t) | |
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t) | |
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t) | |
""" | |
return MAVLink_heartbeat_message(type, autopilot, base_mode, custom_mode, system_status, mavlink_version) | |
def heartbeat_send(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3, force_mavlink1=False): | |
""" | |
The heartbeat message shows that a system is present and responding. | |
The type of the MAV and Autopilot hardware allow the receiving | |
system to treat further messages from this system appropriate | |
(e.g. by laying out the user interface based on the | |
autopilot). | |
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t) | |
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t) | |
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t) | |
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t) | |
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t) | |
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t) | |
""" | |
return self.send(self.heartbeat_encode(type, autopilot, base_mode, custom_mode, system_status, mavlink_version), force_mavlink1=force_mavlink1) | |
def sys_status_encode(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4): | |
""" | |
The general system state. If the system is following the MAVLink | |
standard, the system state is mainly defined by three | |
orthogonal states/modes: The system mode, which is either | |
LOCKED (motors shut down and locked), MANUAL (system under RC | |
control), GUIDED (system with autonomous position control, | |
position setpoint controlled manually) or AUTO (system guided | |
by path/waypoint planner). The NAV_MODE defined the current | |
flight state: LIFTOFF (often an open-loop maneuver), LANDING, | |
WAYPOINTS or VECTOR. This represents the internal navigation | |
state machine. The system status shows wether the system is | |
currently active or not and if an emergency occured. During | |
the CRITICAL and EMERGENCY states the MAV is still considered | |
to be active, but should start emergency procedures | |
autonomously. After a failure occured it should first move | |
from active to critical to allow manual intervention and then | |
move to emergency after a certain timeout. | |
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t) | |
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t) | |
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t) | |
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t) | |
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_count1 : Autopilot-specific errors (type:uint16_t) | |
errors_count2 : Autopilot-specific errors (type:uint16_t) | |
errors_count3 : Autopilot-specific errors (type:uint16_t) | |
errors_count4 : Autopilot-specific errors (type:uint16_t) | |
""" | |
return MAVLink_sys_status_message(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4) | |
def sys_status_send(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4, force_mavlink1=False): | |
""" | |
The general system state. If the system is following the MAVLink | |
standard, the system state is mainly defined by three | |
orthogonal states/modes: The system mode, which is either | |
LOCKED (motors shut down and locked), MANUAL (system under RC | |
control), GUIDED (system with autonomous position control, | |
position setpoint controlled manually) or AUTO (system guided | |
by path/waypoint planner). The NAV_MODE defined the current | |
flight state: LIFTOFF (often an open-loop maneuver), LANDING, | |
WAYPOINTS or VECTOR. This represents the internal navigation | |
state machine. The system status shows wether the system is | |
currently active or not and if an emergency occured. During | |
the CRITICAL and EMERGENCY states the MAV is still considered | |
to be active, but should start emergency procedures | |
autonomously. After a failure occured it should first move | |
from active to critical to allow manual intervention and then | |
move to emergency after a certain timeout. | |
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t) | |
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t) | |
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t) | |
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t) | |
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_count1 : Autopilot-specific errors (type:uint16_t) | |
errors_count2 : Autopilot-specific errors (type:uint16_t) | |
errors_count3 : Autopilot-specific errors (type:uint16_t) | |
errors_count4 : Autopilot-specific errors (type:uint16_t) | |
""" | |
return self.send(self.sys_status_encode(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4), force_mavlink1=force_mavlink1) | |
def system_time_encode(self, time_unix_usec, time_boot_ms): | |
""" | |
The system time is the time of the master clock, typically the | |
computer clock of the main onboard computer. | |
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t) | |
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t) | |
""" | |
return MAVLink_system_time_message(time_unix_usec, time_boot_ms) | |
def system_time_send(self, time_unix_usec, time_boot_ms, force_mavlink1=False): | |
""" | |
The system time is the time of the master clock, typically the | |
computer clock of the main onboard computer. | |
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t) | |
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t) | |
""" | |
return self.send(self.system_time_encode(time_unix_usec, time_boot_ms), force_mavlink1=force_mavlink1) | |
def ping_encode(self, time_usec, seq, target_system, target_component): | |
""" | |
A ping message either requesting or responding to a ping. This allows | |
to measure the system latencies, including serial port, radio | |
modem and UDP connections. | |
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t) | |
seq : PING sequence (type:uint32_t) | |
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
""" | |
return MAVLink_ping_message(time_usec, seq, target_system, target_component) | |
def ping_send(self, time_usec, seq, target_system, target_component, force_mavlink1=False): | |
""" | |
A ping message either requesting or responding to a ping. This allows | |
to measure the system latencies, including serial port, radio | |
modem and UDP connections. | |
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t) | |
seq : PING sequence (type:uint32_t) | |
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
""" | |
return self.send(self.ping_encode(time_usec, seq, target_system, target_component), force_mavlink1=force_mavlink1) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
MAVLink protocol implementation (auto-generated by mavgen.py) | |
Generated from: common.xml | |
Note: this file has been auto-generated. DO NOT EDIT | |
''' | |
from __future__ import print_function | |
from builtins import range | |
from builtins import object | |
import struct, array, time, json, os, sys, platform | |
from pymavlink.generator.mavcrc import x25crc | |
import hashlib | |
WIRE_PROTOCOL_VERSION = '2.0' | |
DIALECT = 'before' | |
PROTOCOL_MARKER_V1 = 0xFE | |
PROTOCOL_MARKER_V2 = 0xFD | |
HEADER_LEN_V1 = 6 | |
HEADER_LEN_V2 = 10 | |
MAVLINK_SIGNATURE_BLOCK_LEN = 13 | |
MAVLINK_IFLAG_SIGNED = 0x01 | |
native_supported = platform.system() != 'Windows' # Not yet supported on other dialects | |
native_force = 'MAVNATIVE_FORCE' in os.environ # Will force use of native code regardless of what client app wants | |
native_testing = 'MAVNATIVE_TESTING' in os.environ # Will force both native and legacy code to be used and their results compared | |
if native_supported and float(WIRE_PROTOCOL_VERSION) <= 1: | |
try: | |
import mavnative | |
except ImportError: | |
print('ERROR LOADING MAVNATIVE - falling back to python implementation') | |
native_supported = False | |
else: | |
# mavnative isn't supported for MAVLink2 yet | |
native_supported = False | |
# allow MAV_IGNORE_CRC=1 to ignore CRC, allowing some | |
# corrupted msgs to be seen | |
MAVLINK_IGNORE_CRC = os.environ.get("MAV_IGNORE_CRC",0) | |
# some base types from mavlink_types.h | |
MAVLINK_TYPE_CHAR = 0 | |
MAVLINK_TYPE_UINT8_T = 1 | |
MAVLINK_TYPE_INT8_T = 2 | |
MAVLINK_TYPE_UINT16_T = 3 | |
MAVLINK_TYPE_INT16_T = 4 | |
MAVLINK_TYPE_UINT32_T = 5 | |
MAVLINK_TYPE_INT32_T = 6 | |
MAVLINK_TYPE_UINT64_T = 7 | |
MAVLINK_TYPE_INT64_T = 8 | |
MAVLINK_TYPE_FLOAT = 9 | |
MAVLINK_TYPE_DOUBLE = 10 | |
# swiped from DFReader.py | |
def to_string(s): | |
'''desperate attempt to convert a string regardless of what garbage we get''' | |
try: | |
return s.decode("utf-8") | |
except Exception as e: | |
pass | |
try: | |
s2 = s.encode('utf-8', 'ignore') | |
x = u"%s" % s2 | |
return s2 | |
except Exception: | |
pass | |
# so it's a nasty one. Let's grab as many characters as we can | |
r = '' | |
try: | |
for c in s: | |
r2 = r + c | |
r2 = r2.encode('ascii', 'ignore') | |
x = u"%s" % r2 | |
r = r2 | |
except Exception: | |
pass | |
return r + '_XXX' | |
class MAVLink_header(object): | |
'''MAVLink message header''' | |
def __init__(self, msgId, incompat_flags=0, compat_flags=0, mlen=0, seq=0, srcSystem=0, srcComponent=0): | |
self.mlen = mlen | |
self.seq = seq | |
self.srcSystem = srcSystem | |
self.srcComponent = srcComponent | |
self.msgId = msgId | |
self.incompat_flags = incompat_flags | |
self.compat_flags = compat_flags | |
def pack(self, force_mavlink1=False): | |
if WIRE_PROTOCOL_VERSION == '2.0' and not force_mavlink1: | |
return struct.pack('<BBBBBBBHB', 253, self.mlen, | |
self.incompat_flags, self.compat_flags, | |
self.seq, self.srcSystem, self.srcComponent, | |
self.msgId&0xFFFF, self.msgId>>16) | |
return struct.pack('<BBBBBB', PROTOCOL_MARKER_V1, self.mlen, self.seq, | |
self.srcSystem, self.srcComponent, self.msgId) | |
class MAVLink_message(object): | |
'''base MAVLink message class''' | |
def __init__(self, msgId, name): | |
self._header = MAVLink_header(msgId) | |
self._payload = None | |
self._msgbuf = None | |
self._crc = None | |
self._fieldnames = [] | |
self._type = name | |
self._signed = False | |
self._link_id = None | |
self._instances = None | |
self._instance_field = None | |
def format_attr(self, field): | |
'''override field getter''' | |
raw_attr = getattr(self,field) | |
if isinstance(raw_attr, bytes): | |
raw_attr = to_string(raw_attr).rstrip("\00") | |
return raw_attr | |
def get_msgbuf(self): | |
if isinstance(self._msgbuf, bytearray): | |
return self._msgbuf | |
return bytearray(self._msgbuf) | |
def get_header(self): | |
return self._header | |
def get_payload(self): | |
return self._payload | |
def get_crc(self): | |
return self._crc | |
def get_fieldnames(self): | |
return self._fieldnames | |
def get_type(self): | |
return self._type | |
def get_msgId(self): | |
return self._header.msgId | |
def get_srcSystem(self): | |
return self._header.srcSystem | |
def get_srcComponent(self): | |
return self._header.srcComponent | |
def get_seq(self): | |
return self._header.seq | |
def get_signed(self): | |
return self._signed | |
def get_link_id(self): | |
return self._link_id | |
def __str__(self): | |
ret = '%s {' % self._type | |
for a in self._fieldnames: | |
v = self.format_attr(a) | |
ret += '%s : %s, ' % (a, v) | |
ret = ret[0:-2] + '}' | |
return ret | |
def __ne__(self, other): | |
return not self.__eq__(other) | |
def __eq__(self, other): | |
if other is None: | |
return False | |
if self.get_type() != other.get_type(): | |
return False | |
# We do not compare CRC because native code doesn't provide it | |
#if self.get_crc() != other.get_crc(): | |
# return False | |
if self.get_seq() != other.get_seq(): | |
return False | |
if self.get_srcSystem() != other.get_srcSystem(): | |
return False | |
if self.get_srcComponent() != other.get_srcComponent(): | |
return False | |
for a in self._fieldnames: | |
if self.format_attr(a) != other.format_attr(a): | |
return False | |
return True | |
def to_dict(self): | |
d = dict({}) | |
d['mavpackettype'] = self._type | |
for a in self._fieldnames: | |
d[a] = self.format_attr(a) | |
return d | |
def to_json(self): | |
return json.dumps(self.to_dict()) | |
def sign_packet(self, mav): | |
h = hashlib.new('sha256') | |
self._msgbuf += struct.pack('<BQ', mav.signing.link_id, mav.signing.timestamp)[:7] | |
h.update(mav.signing.secret_key) | |
h.update(self._msgbuf) | |
sig = h.digest()[:6] | |
self._msgbuf += sig | |
mav.signing.timestamp += 1 | |
def pack(self, mav, crc_extra, payload, force_mavlink1=False): | |
plen = len(payload) | |
if WIRE_PROTOCOL_VERSION != '1.0' and not force_mavlink1: | |
# in MAVLink2 we can strip trailing zeros off payloads. This allows for simple | |
# variable length arrays and smaller packets | |
nullbyte = chr(0) | |
# in Python2, type("fred') is str but also type("fred")==bytes | |
if str(type(payload)) == "<class 'bytes'>": | |
nullbyte = 0 | |
while plen > 1 and payload[plen-1] == nullbyte: | |
plen -= 1 | |
self._payload = payload[:plen] | |
incompat_flags = 0 | |
if mav.signing.sign_outgoing: | |
incompat_flags |= MAVLINK_IFLAG_SIGNED | |
self._header = MAVLink_header(self._header.msgId, | |
incompat_flags=incompat_flags, compat_flags=0, | |
mlen=len(self._payload), seq=mav.seq, | |
srcSystem=mav.srcSystem, srcComponent=mav.srcComponent) | |
self._msgbuf = self._header.pack(force_mavlink1=force_mavlink1) + self._payload | |
crc = x25crc(self._msgbuf[1:]) | |
if True: # using CRC extra | |
crc.accumulate_str(struct.pack('B', crc_extra)) | |
self._crc = crc.crc | |
self._msgbuf += struct.pack('<H', self._crc) | |
if mav.signing.sign_outgoing and not force_mavlink1: | |
self.sign_packet(mav) | |
return self._msgbuf | |
def __getitem__(self, key): | |
'''support indexing, allowing for multi-instance sensors in one message''' | |
if self._instances is None: | |
raise IndexError() | |
if not key in self._instances: | |
raise IndexError() | |
return self._instances[key] | |
# enums | |
class EnumEntry(object): | |
def __init__(self, name, description): | |
self.name = name | |
self.description = description | |
self.param = {} | |
enums = {} | |
# MAV_AUTOPILOT | |
enums['MAV_AUTOPILOT'] = {} | |
MAV_AUTOPILOT_GENERIC = 0 # Generic autopilot, full support for everything | |
enums['MAV_AUTOPILOT'][0] = EnumEntry('MAV_AUTOPILOT_GENERIC', '''Generic autopilot, full support for everything''') | |
MAV_AUTOPILOT_RESERVED = 1 # Reserved for future use. | |
enums['MAV_AUTOPILOT'][1] = EnumEntry('MAV_AUTOPILOT_RESERVED', '''Reserved for future use.''') | |
MAV_AUTOPILOT_SLUGS = 2 # SLUGS autopilot, http://slugsuav.soe.ucsc.edu | |
enums['MAV_AUTOPILOT'][2] = EnumEntry('MAV_AUTOPILOT_SLUGS', '''SLUGS autopilot, http://slugsuav.soe.ucsc.edu''') | |
MAV_AUTOPILOT_AEROB = 16 # Aerob -- http://aerob.ru | |
enums['MAV_AUTOPILOT'][16] = EnumEntry('MAV_AUTOPILOT_AEROB', '''Aerob -- http://aerob.ru''') | |
MAV_AUTOPILOT_ASLUAV = 17 # ASLUAV autopilot -- http://www.asl.ethz.ch | |
enums['MAV_AUTOPILOT'][17] = EnumEntry('MAV_AUTOPILOT_ASLUAV', '''ASLUAV autopilot -- http://www.asl.ethz.ch''') | |
MAV_AUTOPILOT_ENUM_END = 18 # | |
enums['MAV_AUTOPILOT'][18] = EnumEntry('MAV_AUTOPILOT_ENUM_END', '''''') | |
# MAV_TYPE | |
enums['MAV_TYPE'] = {} | |
MAV_TYPE_GENERIC = 0 # Generic micro air vehicle. | |
enums['MAV_TYPE'][0] = EnumEntry('MAV_TYPE_GENERIC', '''Generic micro air vehicle.''') | |
MAV_TYPE_FIXED_WING = 1 # Fixed wing aircraft. | |
enums['MAV_TYPE'][1] = EnumEntry('MAV_TYPE_FIXED_WING', '''Fixed wing aircraft.''') | |
MAV_TYPE_QUADROTOR = 2 # Quadrotor | |
enums['MAV_TYPE'][2] = EnumEntry('MAV_TYPE_QUADROTOR', '''Quadrotor''') | |
MAV_TYPE_COAXIAL = 3 # Coaxial helicopter | |
enums['MAV_TYPE'][3] = EnumEntry('MAV_TYPE_COAXIAL', '''Coaxial helicopter''') | |
MAV_TYPE_HELICOPTER = 4 # Normal helicopter with tail rotor. | |
enums['MAV_TYPE'][4] = EnumEntry('MAV_TYPE_HELICOPTER', '''Normal helicopter with tail rotor.''') | |
MAV_TYPE_VTOL_TILTROTOR = 21 # Tiltrotor VTOL | |
enums['MAV_TYPE'][21] = EnumEntry('MAV_TYPE_VTOL_TILTROTOR', '''Tiltrotor VTOL''') | |
MAV_TYPE_VTOL_RESERVED2 = 22 # VTOL reserved 2 | |
enums['MAV_TYPE'][22] = EnumEntry('MAV_TYPE_VTOL_RESERVED2', '''VTOL reserved 2''') | |
MAV_TYPE_VTOL_RESERVED3 = 23 # VTOL reserved 3 | |
enums['MAV_TYPE'][23] = EnumEntry('MAV_TYPE_VTOL_RESERVED3', '''VTOL reserved 3''') | |
MAV_TYPE_VTOL_RESERVED4 = 24 # VTOL reserved 4 | |
enums['MAV_TYPE'][24] = EnumEntry('MAV_TYPE_VTOL_RESERVED4', '''VTOL reserved 4''') | |
MAV_TYPE_ENUM_END = 25 # | |
enums['MAV_TYPE'][25] = EnumEntry('MAV_TYPE_ENUM_END', '''''') | |
# FIRMWARE_VERSION_TYPE | |
enums['FIRMWARE_VERSION_TYPE'] = {} | |
FIRMWARE_VERSION_TYPE_DEV = 0 # development release | |
enums['FIRMWARE_VERSION_TYPE'][0] = EnumEntry('FIRMWARE_VERSION_TYPE_DEV', '''development release''') | |
FIRMWARE_VERSION_TYPE_ALPHA = 64 # alpha release | |
enums['FIRMWARE_VERSION_TYPE'][64] = EnumEntry('FIRMWARE_VERSION_TYPE_ALPHA', '''alpha release''') | |
FIRMWARE_VERSION_TYPE_BETA = 128 # beta release | |
enums['FIRMWARE_VERSION_TYPE'][128] = EnumEntry('FIRMWARE_VERSION_TYPE_BETA', '''beta release''') | |
FIRMWARE_VERSION_TYPE_RC = 192 # release candidate | |
enums['FIRMWARE_VERSION_TYPE'][192] = EnumEntry('FIRMWARE_VERSION_TYPE_RC', '''release candidate''') | |
FIRMWARE_VERSION_TYPE_OFFICIAL = 255 # official stable release | |
enums['FIRMWARE_VERSION_TYPE'][255] = EnumEntry('FIRMWARE_VERSION_TYPE_OFFICIAL', '''official stable release''') | |
FIRMWARE_VERSION_TYPE_ENUM_END = 256 # | |
enums['FIRMWARE_VERSION_TYPE'][256] = EnumEntry('FIRMWARE_VERSION_TYPE_ENUM_END', '''''') | |
# MAV_CMD | |
enums['MAV_CMD'] = {} | |
MAV_CMD_NAV_WAYPOINT = 16 # Navigate to MISSION. | |
enums['MAV_CMD'][16] = EnumEntry('MAV_CMD_NAV_WAYPOINT', '''Navigate to MISSION.''') | |
enums['MAV_CMD'][16].param[1] = '''Hold time in decimal seconds. (ignored by fixed wing, time to stay at MISSION for rotary wing)''' | |
enums['MAV_CMD'][16].param[2] = '''Acceptance radius in meters (if the sphere with this radius is hit, the MISSION counts as reached)''' | |
enums['MAV_CMD'][16].param[3] = '''0 to pass through the WP, if > 0 radius in meters to pass by WP. Positive value for clockwise orbit, negative value for counter-clockwise orbit. Allows trajectory control.''' | |
enums['MAV_CMD'][16].param[4] = '''Desired yaw angle at MISSION (rotary wing)''' | |
enums['MAV_CMD'][16].param[5] = '''Latitude''' | |
enums['MAV_CMD'][16].param[6] = '''Longitude''' | |
enums['MAV_CMD'][16].param[7] = '''Altitude''' | |
MAV_CMD_NAV_GUIDED_ENABLE = 92 # hand control over to an external controller | |
enums['MAV_CMD'][92] = EnumEntry('MAV_CMD_NAV_GUIDED_ENABLE', '''hand control over to an external controller''') | |
enums['MAV_CMD'][92].param[1] = '''On / Off (> 0.5f on)''' | |
enums['MAV_CMD'][92].param[2] = '''Empty''' | |
enums['MAV_CMD'][92].param[3] = '''Empty''' | |
enums['MAV_CMD'][92].param[4] = '''Empty''' | |
enums['MAV_CMD'][92].param[5] = '''Empty''' | |
enums['MAV_CMD'][92].param[6] = '''Empty''' | |
enums['MAV_CMD'][92].param[7] = '''Empty''' | |
MAV_CMD_DO_GO_AROUND = 191 # Mission command to safely abort an autonmous landing. | |
enums['MAV_CMD'][191] = EnumEntry('MAV_CMD_DO_GO_AROUND', '''Mission command to safely abort an autonmous landing.''') | |
enums['MAV_CMD'][191].param[1] = '''Altitude (meters)''' | |
enums['MAV_CMD'][191].param[2] = '''Empty''' | |
enums['MAV_CMD'][191].param[3] = '''Empty''' | |
enums['MAV_CMD'][191].param[4] = '''Empty''' | |
enums['MAV_CMD'][191].param[5] = '''Empty''' | |
enums['MAV_CMD'][191].param[6] = '''Empty''' | |
enums['MAV_CMD'][191].param[7] = '''Empty''' | |
MAV_CMD_ENUM_END = 192 # | |
enums['MAV_CMD'][192] = EnumEntry('MAV_CMD_ENUM_END', '''''') | |
# message IDs | |
MAVLINK_MSG_ID_BAD_DATA = -1 | |
MAVLINK_MSG_ID_UNKNOWN = -2 | |
MAVLINK_MSG_ID_HEARTBEAT = 0 | |
MAVLINK_MSG_ID_SYS_STATUS = 1 | |
MAVLINK_MSG_ID_SYSTEM_TIME = 2 | |
MAVLINK_MSG_ID_PING = 4 | |
class MAVLink_heartbeat_message(MAVLink_message): | |
''' | |
The heartbeat message shows that a system is present and | |
responding. The type of the MAV and Autopilot hardware allow | |
the receiving system to treat further messages from this | |
system appropriate (e.g. by laying out the user interface | |
based on the autopilot). | |
''' | |
id = MAVLINK_MSG_ID_HEARTBEAT | |
name = 'HEARTBEAT' | |
fieldnames = ['type', 'autopilot', 'base_mode', 'custom_mode', 'system_status', 'mavlink_version'] | |
ordered_fieldnames = ['custom_mode', 'type', 'autopilot', 'base_mode', 'system_status', 'mavlink_version'] | |
fieldtypes = ['uint8_t', 'uint8_t', 'uint8_t', 'uint32_t', 'uint8_t', 'uint8_t'] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = '<IBBBBB' | |
native_format = bytearray('<IBBBBB', 'ascii') | |
orders = [1, 2, 3, 0, 4, 5] | |
lengths = [1, 1, 1, 1, 1, 1] | |
array_lengths = [0, 0, 0, 0, 0, 0] | |
crc_extra = 50 | |
unpacker = struct.Struct('<IBBBBB') | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version): | |
MAVLink_message.__init__(self, MAVLink_heartbeat_message.id, MAVLink_heartbeat_message.name) | |
self._fieldnames = MAVLink_heartbeat_message.fieldnames | |
self._instance_field = MAVLink_heartbeat_message.instance_field | |
self._instance_offset = MAVLink_heartbeat_message.instance_offset | |
self.type = type | |
self.autopilot = autopilot | |
self.base_mode = base_mode | |
self.custom_mode = custom_mode | |
self.system_status = system_status | |
self.mavlink_version = mavlink_version | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 50, struct.pack('<IBBBBB', self.custom_mode, self.type, self.autopilot, self.base_mode, self.system_status, self.mavlink_version), force_mavlink1=force_mavlink1) | |
class MAVLink_sys_status_message(MAVLink_message): | |
''' | |
The general system state. If the system is following the | |
MAVLink standard, the system state is mainly defined by three | |
orthogonal states/modes: The system mode, which is either | |
LOCKED (motors shut down and locked), MANUAL (system under RC | |
control), GUIDED (system with autonomous position control, | |
position setpoint controlled manually) or AUTO (system guided | |
by path/waypoint planner). The NAV_MODE defined the current | |
flight state: LIFTOFF (often an open-loop maneuver), LANDING, | |
WAYPOINTS or VECTOR. This represents the internal navigation | |
state machine. The system status shows wether the system is | |
currently active or not and if an emergency occured. During | |
the CRITICAL and EMERGENCY states the MAV is still considered | |
to be active, but should start emergency procedures | |
autonomously. After a failure occured it should first move | |
from active to critical to allow manual intervention and then | |
move to emergency after a certain timeout. | |
''' | |
id = MAVLINK_MSG_ID_SYS_STATUS | |
name = 'SYS_STATUS' | |
fieldnames = ['onboard_control_sensors_present', 'onboard_control_sensors_enabled', 'onboard_control_sensors_health', 'load', 'voltage_battery', 'current_battery', 'battery_remaining', 'drop_rate_comm', 'errors_comm', 'errors_count1', 'errors_count2', 'errors_count3', 'errors_count4'] | |
ordered_fieldnames = ['onboard_control_sensors_present', 'onboard_control_sensors_enabled', 'onboard_control_sensors_health', 'load', 'voltage_battery', 'current_battery', 'drop_rate_comm', 'errors_comm', 'errors_count1', 'errors_count2', 'errors_count3', 'errors_count4', 'battery_remaining'] | |
fieldtypes = ['uint32_t', 'uint32_t', 'uint32_t', 'uint16_t', 'uint16_t', 'int16_t', 'int8_t', 'uint16_t', 'uint16_t', 'uint16_t', 'uint16_t', 'uint16_t', 'uint16_t'] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = '<IIIHHhHHHHHHb' | |
native_format = bytearray('<IIIHHhHHHHHHb', 'ascii') | |
orders = [0, 1, 2, 3, 4, 5, 12, 6, 7, 8, 9, 10, 11] | |
lengths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] | |
array_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] | |
crc_extra = 124 | |
unpacker = struct.Struct('<IIIHHhHHHHHHb') | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4): | |
MAVLink_message.__init__(self, MAVLink_sys_status_message.id, MAVLink_sys_status_message.name) | |
self._fieldnames = MAVLink_sys_status_message.fieldnames | |
self._instance_field = MAVLink_sys_status_message.instance_field | |
self._instance_offset = MAVLink_sys_status_message.instance_offset | |
self.onboard_control_sensors_present = onboard_control_sensors_present | |
self.onboard_control_sensors_enabled = onboard_control_sensors_enabled | |
self.onboard_control_sensors_health = onboard_control_sensors_health | |
self.load = load | |
self.voltage_battery = voltage_battery | |
self.current_battery = current_battery | |
self.battery_remaining = battery_remaining | |
self.drop_rate_comm = drop_rate_comm | |
self.errors_comm = errors_comm | |
self.errors_count1 = errors_count1 | |
self.errors_count2 = errors_count2 | |
self.errors_count3 = errors_count3 | |
self.errors_count4 = errors_count4 | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 124, struct.pack('<IIIHHhHHHHHHb', self.onboard_control_sensors_present, self.onboard_control_sensors_enabled, self.onboard_control_sensors_health, self.load, self.voltage_battery, self.current_battery, self.drop_rate_comm, self.errors_comm, self.errors_count1, self.errors_count2, self.errors_count3, self.errors_count4, self.battery_remaining), force_mavlink1=force_mavlink1) | |
class MAVLink_system_time_message(MAVLink_message): | |
''' | |
The system time is the time of the master clock, typically the | |
computer clock of the main onboard computer. | |
''' | |
id = MAVLINK_MSG_ID_SYSTEM_TIME | |
name = 'SYSTEM_TIME' | |
fieldnames = ['time_unix_usec', 'time_boot_ms'] | |
ordered_fieldnames = ['time_unix_usec', 'time_boot_ms'] | |
fieldtypes = ['uint64_t', 'uint32_t'] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = '<QI' | |
native_format = bytearray('<QI', 'ascii') | |
orders = [0, 1] | |
lengths = [1, 1] | |
array_lengths = [0, 0] | |
crc_extra = 137 | |
unpacker = struct.Struct('<QI') | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, time_unix_usec, time_boot_ms): | |
MAVLink_message.__init__(self, MAVLink_system_time_message.id, MAVLink_system_time_message.name) | |
self._fieldnames = MAVLink_system_time_message.fieldnames | |
self._instance_field = MAVLink_system_time_message.instance_field | |
self._instance_offset = MAVLink_system_time_message.instance_offset | |
self.time_unix_usec = time_unix_usec | |
self.time_boot_ms = time_boot_ms | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 137, struct.pack('<QI', self.time_unix_usec, self.time_boot_ms), force_mavlink1=force_mavlink1) | |
class MAVLink_ping_message(MAVLink_message): | |
''' | |
A ping message either requesting or responding to a ping. This | |
allows to measure the system latencies, including serial port, | |
radio modem and UDP connections. | |
''' | |
id = MAVLINK_MSG_ID_PING | |
name = 'PING' | |
fieldnames = ['time_usec', 'seq', 'target_system', 'target_component'] | |
ordered_fieldnames = ['time_usec', 'seq', 'target_system', 'target_component'] | |
fieldtypes = ['uint64_t', 'uint32_t', 'uint8_t', 'uint8_t'] | |
fielddisplays_by_name = {} | |
fieldenums_by_name = {} | |
fieldunits_by_name = {} | |
format = '<QIBB' | |
native_format = bytearray('<QIBB', 'ascii') | |
orders = [0, 1, 2, 3] | |
lengths = [1, 1, 1, 1] | |
array_lengths = [0, 0, 0, 0] | |
crc_extra = 237 | |
unpacker = struct.Struct('<QIBB') | |
instance_field = None | |
instance_offset = -1 | |
def __init__(self, time_usec, seq, target_system, target_component): | |
MAVLink_message.__init__(self, MAVLink_ping_message.id, MAVLink_ping_message.name) | |
self._fieldnames = MAVLink_ping_message.fieldnames | |
self._instance_field = MAVLink_ping_message.instance_field | |
self._instance_offset = MAVLink_ping_message.instance_offset | |
self.time_usec = time_usec | |
self.seq = seq | |
self.target_system = target_system | |
self.target_component = target_component | |
def pack(self, mav, force_mavlink1=False): | |
return MAVLink_message.pack(self, mav, 237, struct.pack('<QIBB', self.time_usec, self.seq, self.target_system, self.target_component), force_mavlink1=force_mavlink1) | |
mavlink_map = { | |
MAVLINK_MSG_ID_HEARTBEAT : MAVLink_heartbeat_message, | |
MAVLINK_MSG_ID_SYS_STATUS : MAVLink_sys_status_message, | |
MAVLINK_MSG_ID_SYSTEM_TIME : MAVLink_system_time_message, | |
MAVLINK_MSG_ID_PING : MAVLink_ping_message, | |
} | |
class MAVError(Exception): | |
'''MAVLink error class''' | |
def __init__(self, msg): | |
Exception.__init__(self, msg) | |
self.message = msg | |
class MAVString(str): | |
'''NUL terminated string''' | |
def __init__(self, s): | |
str.__init__(self) | |
def __str__(self): | |
i = self.find(chr(0)) | |
if i == -1: | |
return self[:] | |
return self[0:i] | |
class MAVLink_bad_data(MAVLink_message): | |
''' | |
a piece of bad data in a mavlink stream | |
''' | |
def __init__(self, data, reason): | |
MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, 'BAD_DATA') | |
self._fieldnames = ['data', 'reason'] | |
self.data = data | |
self.reason = reason | |
self._msgbuf = data | |
self._instance_field = None | |
def __str__(self): | |
'''Override the __str__ function from MAVLink_messages because non-printable characters are common in to be the reason for this message to exist.''' | |
return '%s {%s, data:%s}' % (self._type, self.reason, [('%x' % ord(i) if isinstance(i, str) else '%x' % i) for i in self.data]) | |
class MAVLink_unknown(MAVLink_message): | |
''' | |
a message that we don't have in the XML used when built | |
''' | |
def __init__(self, msgid, data): | |
MAVLink_message.__init__(self, MAVLINK_MSG_ID_UNKNOWN, 'UNKNOWN_%u' % msgid) | |
self._fieldnames = ['data'] | |
self.data = data | |
self._msgbuf = data | |
self._instance_field = None | |
def __str__(self): | |
'''Override the __str__ function from MAVLink_messages because non-printable characters are common.''' | |
return '%s {data:%s}' % (self._type, [('%x' % ord(i) if isinstance(i, str) else '%x' % i) for i in self.data]) | |
class MAVLinkSigning(object): | |
'''MAVLink signing state class''' | |
def __init__(self): | |
self.secret_key = None | |
self.timestamp = 0 | |
self.link_id = 0 | |
self.sign_outgoing = False | |
self.allow_unsigned_callback = None | |
self.stream_timestamps = {} | |
self.sig_count = 0 | |
self.badsig_count = 0 | |
self.goodsig_count = 0 | |
self.unsigned_count = 0 | |
self.reject_count = 0 | |
class MAVLink(object): | |
'''MAVLink protocol handling class''' | |
def __init__(self, file, srcSystem=0, srcComponent=0, use_native=False): | |
self.seq = 0 | |
self.file = file | |
self.srcSystem = srcSystem | |
self.srcComponent = srcComponent | |
self.callback = None | |
self.callback_args = None | |
self.callback_kwargs = None | |
self.send_callback = None | |
self.send_callback_args = None | |
self.send_callback_kwargs = None | |
self.buf = bytearray() | |
self.buf_index = 0 | |
self.expected_length = HEADER_LEN_V1+2 | |
self.have_prefix_error = False | |
self.robust_parsing = False | |
self.protocol_marker = 253 | |
self.little_endian = True | |
self.crc_extra = True | |
self.sort_fields = True | |
self.total_packets_sent = 0 | |
self.total_bytes_sent = 0 | |
self.total_packets_received = 0 | |
self.total_bytes_received = 0 | |
self.total_receive_errors = 0 | |
self.startup_time = time.time() | |
self.signing = MAVLinkSigning() | |
if native_supported and (use_native or native_testing or native_force): | |
print("NOTE: mavnative is currently beta-test code") | |
self.native = mavnative.NativeConnection(MAVLink_message, mavlink_map) | |
else: | |
self.native = None | |
if native_testing: | |
self.test_buf = bytearray() | |
self.mav20_unpacker = struct.Struct('<cBBBBBBHB') | |
self.mav10_unpacker = struct.Struct('<cBBBBB') | |
self.mav20_h3_unpacker = struct.Struct('BBB') | |
self.mav_csum_unpacker = struct.Struct('<H') | |
self.mav_sign_unpacker = struct.Struct('<IH') | |
def set_callback(self, callback, *args, **kwargs): | |
self.callback = callback | |
self.callback_args = args | |
self.callback_kwargs = kwargs | |
def set_send_callback(self, callback, *args, **kwargs): | |
self.send_callback = callback | |
self.send_callback_args = args | |
self.send_callback_kwargs = kwargs | |
def send(self, mavmsg, force_mavlink1=False): | |
'''send a MAVLink message''' | |
buf = mavmsg.pack(self, force_mavlink1=force_mavlink1) | |
self.file.write(buf) | |
self.seq = (self.seq + 1) % 256 | |
self.total_packets_sent += 1 | |
self.total_bytes_sent += len(buf) | |
if self.send_callback: | |
self.send_callback(mavmsg, *self.send_callback_args, **self.send_callback_kwargs) | |
def buf_len(self): | |
return len(self.buf) - self.buf_index | |
def bytes_needed(self): | |
'''return number of bytes needed for next parsing stage''' | |
if self.native: | |
ret = self.native.expected_length - self.buf_len() | |
else: | |
ret = self.expected_length - self.buf_len() | |
if ret <= 0: | |
return 1 | |
return ret | |
def __parse_char_native(self, c): | |
'''this method exists only to see in profiling results''' | |
m = self.native.parse_chars(c) | |
return m | |
def __callbacks(self, msg): | |
'''this method exists only to make profiling results easier to read''' | |
if self.callback: | |
self.callback(msg, *self.callback_args, **self.callback_kwargs) | |
def parse_char(self, c): | |
'''input some data bytes, possibly returning a new message''' | |
self.buf.extend(c) | |
self.total_bytes_received += len(c) | |
if self.native: | |
if native_testing: | |
self.test_buf.extend(c) | |
m = self.__parse_char_native(self.test_buf) | |
m2 = self.__parse_char_legacy() | |
if m2 != m: | |
print("Native: %s\nLegacy: %s\n" % (m, m2)) | |
raise Exception('Native vs. Legacy mismatch') | |
else: | |
m = self.__parse_char_native(self.buf) | |
else: | |
m = self.__parse_char_legacy() | |
if m is not None: | |
self.total_packets_received += 1 | |
self.__callbacks(m) | |
else: | |
# XXX The idea here is if we've read something and there's nothing left in | |
# the buffer, reset it to 0 which frees the memory | |
if self.buf_len() == 0 and self.buf_index != 0: | |
self.buf = bytearray() | |
self.buf_index = 0 | |
return m | |
def __parse_char_legacy(self): | |
'''input some data bytes, possibly returning a new message (uses no native code)''' | |
header_len = HEADER_LEN_V1 | |
if self.buf_len() >= 1 and self.buf[self.buf_index] == PROTOCOL_MARKER_V2: | |
header_len = HEADER_LEN_V2 | |
if self.buf_len() >= 1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V2: | |
magic = self.buf[self.buf_index] | |
self.buf_index += 1 | |
if self.robust_parsing: | |
m = MAVLink_bad_data(bytearray([magic]), 'Bad prefix') | |
self.expected_length = header_len+2 | |
self.total_receive_errors += 1 | |
return m | |
if self.have_prefix_error: | |
return None | |
self.have_prefix_error = True | |
self.total_receive_errors += 1 | |
raise MAVError("invalid MAVLink prefix '%s'" % magic) | |
self.have_prefix_error = False | |
if self.buf_len() >= 3: | |
sbuf = self.buf[self.buf_index:3+self.buf_index] | |
if sys.version_info.major < 3: | |
sbuf = str(sbuf) | |
(magic, self.expected_length, incompat_flags) = self.mav20_h3_unpacker.unpack(sbuf) | |
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED): | |
self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN | |
self.expected_length += header_len + 2 | |
if self.expected_length >= (header_len+2) and self.buf_len() >= self.expected_length: | |
mbuf = array.array('B', self.buf[self.buf_index:self.buf_index+self.expected_length]) | |
self.buf_index += self.expected_length | |
self.expected_length = header_len+2 | |
if self.robust_parsing: | |
try: | |
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0: | |
raise MAVError('invalid incompat_flags 0x%x 0x%x %u' % (incompat_flags, magic, self.expected_length)) | |
m = self.decode(mbuf) | |
except MAVError as reason: | |
m = MAVLink_bad_data(mbuf, reason.message) | |
self.total_receive_errors += 1 | |
else: | |
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0: | |
raise MAVError('invalid incompat_flags 0x%x 0x%x %u' % (incompat_flags, magic, self.expected_length)) | |
m = self.decode(mbuf) | |
return m | |
return None | |
def parse_buffer(self, s): | |
'''input some data bytes, possibly returning a list of new messages''' | |
m = self.parse_char(s) | |
if m is None: | |
return None | |
ret = [m] | |
while True: | |
m = self.parse_char("") | |
if m is None: | |
return ret | |
ret.append(m) | |
return ret | |
def check_signature(self, msgbuf, srcSystem, srcComponent): | |
'''check signature on incoming message''' | |
if isinstance(msgbuf, array.array): | |
try: | |
msgbuf = msgbuf.tostring() | |
except: | |
msgbuf = msgbuf.tobytes() | |
timestamp_buf = msgbuf[-12:-6] | |
link_id = msgbuf[-13] | |
(tlow, thigh) = self.mav_sign_unpacker.unpack(timestamp_buf) | |
timestamp = tlow + (thigh<<32) | |
# see if the timestamp is acceptable | |
stream_key = (link_id,srcSystem,srcComponent) | |
if stream_key in self.signing.stream_timestamps: | |
if timestamp <= self.signing.stream_timestamps[stream_key]: | |
# reject old timestamp | |
# print('old timestamp') | |
return False | |
else: | |
# a new stream has appeared. Accept the timestamp if it is at most | |
# one minute behind our current timestamp | |
if timestamp + 6000*1000 < self.signing.timestamp: | |
# print('bad new stream ', timestamp/(100.0*1000*60*60*24*365), self.signing.timestamp/(100.0*1000*60*60*24*365)) | |
return False | |
self.signing.stream_timestamps[stream_key] = timestamp | |
# print('new stream') | |
h = hashlib.new('sha256') | |
h.update(self.signing.secret_key) | |
h.update(msgbuf[:-6]) | |
if str(type(msgbuf)) == "<class 'bytes'>" or str(type(msgbuf)) == "<class 'bytearray'>": | |
# Python 3 | |
sig1 = h.digest()[:6] | |
sig2 = msgbuf[-6:] | |
else: | |
sig1 = str(h.digest())[:6] | |
sig2 = str(msgbuf)[-6:] | |
if sig1 != sig2: | |
# print('sig mismatch') | |
return False | |
# the timestamp we next send with is the max of the received timestamp and | |
# our current timestamp | |
self.signing.timestamp = max(self.signing.timestamp, timestamp) | |
return True | |
def decode(self, msgbuf): | |
'''decode a buffer as a MAVLink message''' | |
# decode the header | |
if msgbuf[0] != PROTOCOL_MARKER_V1: | |
headerlen = 10 | |
try: | |
magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = self.mav20_unpacker.unpack(msgbuf[:headerlen]) | |
except struct.error as emsg: | |
raise MAVError('Unable to unpack MAVLink header: %s' % emsg) | |
msgId = msgIdlow | (msgIdhigh<<16) | |
mapkey = msgId | |
else: | |
headerlen = 6 | |
try: | |
magic, mlen, seq, srcSystem, srcComponent, msgId = self.mav10_unpacker.unpack(msgbuf[:headerlen]) | |
incompat_flags = 0 | |
compat_flags = 0 | |
except struct.error as emsg: | |
raise MAVError('Unable to unpack MAVLink header: %s' % emsg) | |
mapkey = msgId | |
if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0: | |
signature_len = MAVLINK_SIGNATURE_BLOCK_LEN | |
else: | |
signature_len = 0 | |
if ord(magic) != PROTOCOL_MARKER_V1 and ord(magic) != PROTOCOL_MARKER_V2: | |
raise MAVError("invalid MAVLink prefix '%s'" % magic) | |
if mlen != len(msgbuf)-(headerlen+2+signature_len): | |
raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u headerlen=%u' % (len(msgbuf)-(headerlen+2+signature_len), mlen, msgId, headerlen)) | |
if not mapkey in mavlink_map: | |
return MAVLink_unknown(msgId, msgbuf) | |
# decode the payload | |
type = mavlink_map[mapkey] | |
fmt = type.format | |
order_map = type.orders | |
len_map = type.lengths | |
crc_extra = type.crc_extra | |
# decode the checksum | |
try: | |
crc, = self.mav_csum_unpacker.unpack(msgbuf[-(2+signature_len):][:2]) | |
except struct.error as emsg: | |
raise MAVError('Unable to unpack MAVLink CRC: %s' % emsg) | |
crcbuf = msgbuf[1:-(2+signature_len)] | |
if True: # using CRC extra | |
crcbuf.append(crc_extra) | |
crc2 = x25crc(crcbuf) | |
if crc != crc2.crc and not MAVLINK_IGNORE_CRC: | |
raise MAVError('invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x' % (msgId, crc, crc2.crc)) | |
sig_ok = False | |
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN: | |
self.signing.sig_count += 1 | |
if self.signing.secret_key is not None: | |
accept_signature = False | |
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN: | |
sig_ok = self.check_signature(msgbuf, srcSystem, srcComponent) | |
accept_signature = sig_ok | |
if sig_ok: | |
self.signing.goodsig_count += 1 | |
else: | |
self.signing.badsig_count += 1 | |
if not accept_signature and self.signing.allow_unsigned_callback is not None: | |
accept_signature = self.signing.allow_unsigned_callback(self, msgId) | |
if accept_signature: | |
self.signing.unsigned_count += 1 | |
else: | |
self.signing.reject_count += 1 | |
elif self.signing.allow_unsigned_callback is not None: | |
accept_signature = self.signing.allow_unsigned_callback(self, msgId) | |
if accept_signature: | |
self.signing.unsigned_count += 1 | |
else: | |
self.signing.reject_count += 1 | |
if not accept_signature: | |
raise MAVError('Invalid signature') | |
csize = type.unpacker.size | |
mbuf = msgbuf[headerlen:-(2+signature_len)] | |
if len(mbuf) < csize: | |
# zero pad to give right size | |
mbuf.extend([0]*(csize - len(mbuf))) | |
if len(mbuf) < csize: | |
raise MAVError('Bad message of type %s length %u needs %s' % ( | |
type, len(mbuf), csize)) | |
mbuf = mbuf[:csize] | |
try: | |
t = type.unpacker.unpack(mbuf) | |
except struct.error as emsg: | |
raise MAVError('Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s' % ( | |
type, fmt, len(mbuf), emsg)) | |
tlist = list(t) | |
# handle sorted fields | |
if True: | |
t = tlist[:] | |
if sum(len_map) == len(len_map): | |
# message has no arrays in it | |
for i in range(0, len(tlist)): | |
tlist[i] = t[order_map[i]] | |
else: | |
# message has some arrays | |
tlist = [] | |
for i in range(0, len(order_map)): | |
order = order_map[i] | |
L = len_map[order] | |
tip = sum(len_map[:order]) | |
field = t[tip] | |
if L == 1 or isinstance(field, str): | |
tlist.append(field) | |
else: | |
tlist.append(t[tip:(tip + L)]) | |
# terminate any strings | |
for i in range(0, len(tlist)): | |
if type.fieldtypes[i] == 'char': | |
if sys.version_info.major >= 3: | |
tlist[i] = to_string(tlist[i]) | |
tlist[i] = str(MAVString(tlist[i])) | |
t = tuple(tlist) | |
# construct the message object | |
try: | |
m = type(*t) | |
except Exception as emsg: | |
raise MAVError('Unable to instantiate MAVLink message of type %s : %s' % (type, emsg)) | |
m._signed = sig_ok | |
if m._signed: | |
m._link_id = msgbuf[-13] | |
m._msgbuf = msgbuf | |
m._payload = msgbuf[6:-(2+signature_len)] | |
m._crc = crc | |
m._header = MAVLink_header(msgId, incompat_flags, compat_flags, mlen, seq, srcSystem, srcComponent) | |
return m | |
def heartbeat_encode(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3): | |
''' | |
The heartbeat message shows that a system is present and responding. | |
The type of the MAV and Autopilot hardware allow the | |
receiving system to treat further messages from this | |
system appropriate (e.g. by laying out the user | |
interface based on the autopilot). | |
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t) | |
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t) | |
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t) | |
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t) | |
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t) | |
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t) | |
''' | |
return MAVLink_heartbeat_message(type, autopilot, base_mode, custom_mode, system_status, mavlink_version) | |
def heartbeat_send(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3, force_mavlink1=False): | |
''' | |
The heartbeat message shows that a system is present and responding. | |
The type of the MAV and Autopilot hardware allow the | |
receiving system to treat further messages from this | |
system appropriate (e.g. by laying out the user | |
interface based on the autopilot). | |
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t) | |
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t) | |
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t) | |
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t) | |
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t) | |
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t) | |
''' | |
return self.send(self.heartbeat_encode(type, autopilot, base_mode, custom_mode, system_status, mavlink_version), force_mavlink1=force_mavlink1) | |
def sys_status_encode(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4): | |
''' | |
The general system state. If the system is following the MAVLink | |
standard, the system state is mainly defined by three | |
orthogonal states/modes: The system mode, which is | |
either LOCKED (motors shut down and locked), MANUAL | |
(system under RC control), GUIDED (system with | |
autonomous position control, position setpoint | |
controlled manually) or AUTO (system guided by | |
path/waypoint planner). The NAV_MODE defined the | |
current flight state: LIFTOFF (often an open-loop | |
maneuver), LANDING, WAYPOINTS or VECTOR. This | |
represents the internal navigation state machine. The | |
system status shows wether the system is currently | |
active or not and if an emergency occured. During the | |
CRITICAL and EMERGENCY states the MAV is still | |
considered to be active, but should start emergency | |
procedures autonomously. After a failure occured it | |
should first move from active to critical to allow | |
manual intervention and then move to emergency after a | |
certain timeout. | |
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t) | |
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t) | |
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t) | |
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t) | |
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_count1 : Autopilot-specific errors (type:uint16_t) | |
errors_count2 : Autopilot-specific errors (type:uint16_t) | |
errors_count3 : Autopilot-specific errors (type:uint16_t) | |
errors_count4 : Autopilot-specific errors (type:uint16_t) | |
''' | |
return MAVLink_sys_status_message(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4) | |
def sys_status_send(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4, force_mavlink1=False): | |
''' | |
The general system state. If the system is following the MAVLink | |
standard, the system state is mainly defined by three | |
orthogonal states/modes: The system mode, which is | |
either LOCKED (motors shut down and locked), MANUAL | |
(system under RC control), GUIDED (system with | |
autonomous position control, position setpoint | |
controlled manually) or AUTO (system guided by | |
path/waypoint planner). The NAV_MODE defined the | |
current flight state: LIFTOFF (often an open-loop | |
maneuver), LANDING, WAYPOINTS or VECTOR. This | |
represents the internal navigation state machine. The | |
system status shows wether the system is currently | |
active or not and if an emergency occured. During the | |
CRITICAL and EMERGENCY states the MAV is still | |
considered to be active, but should start emergency | |
procedures autonomously. After a failure occured it | |
should first move from active to critical to allow | |
manual intervention and then move to emergency after a | |
certain timeout. | |
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t) | |
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t) | |
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t) | |
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t) | |
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t) | |
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t) | |
errors_count1 : Autopilot-specific errors (type:uint16_t) | |
errors_count2 : Autopilot-specific errors (type:uint16_t) | |
errors_count3 : Autopilot-specific errors (type:uint16_t) | |
errors_count4 : Autopilot-specific errors (type:uint16_t) | |
''' | |
return self.send(self.sys_status_encode(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4), force_mavlink1=force_mavlink1) | |
def system_time_encode(self, time_unix_usec, time_boot_ms): | |
''' | |
The system time is the time of the master clock, typically the | |
computer clock of the main onboard computer. | |
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t) | |
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t) | |
''' | |
return MAVLink_system_time_message(time_unix_usec, time_boot_ms) | |
def system_time_send(self, time_unix_usec, time_boot_ms, force_mavlink1=False): | |
''' | |
The system time is the time of the master clock, typically the | |
computer clock of the main onboard computer. | |
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t) | |
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t) | |
''' | |
return self.send(self.system_time_encode(time_unix_usec, time_boot_ms), force_mavlink1=force_mavlink1) | |
def ping_encode(self, time_usec, seq, target_system, target_component): | |
''' | |
A ping message either requesting or responding to a ping. This allows | |
to measure the system latencies, including serial | |
port, radio modem and UDP connections. | |
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t) | |
seq : PING sequence (type:uint32_t) | |
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
''' | |
return MAVLink_ping_message(time_usec, seq, target_system, target_component) | |
def ping_send(self, time_usec, seq, target_system, target_component, force_mavlink1=False): | |
''' | |
A ping message either requesting or responding to a ping. This allows | |
to measure the system latencies, including serial | |
port, radio modem and UDP connections. | |
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t) | |
seq : PING sequence (type:uint32_t) | |
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t) | |
''' | |
return self.send(self.ping_encode(time_usec, seq, target_system, target_component), force_mavlink1=force_mavlink1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment