Skip to content

Instantly share code, notes, and snippets.

@alehed
Last active May 2, 2022 09:37
Show Gist options
  • Save alehed/ab70682d44fe4149a974a79d7f16d802 to your computer and use it in GitHub Desktop.
Save alehed/ab70682d44fe4149a974a79d7f16d802 to your computer and use it in GitHub Desktop.
mavlink_python output comparison (shows changes in whitespace and quotes)
"""
MAVLink protocol implementation (auto-generated by mavgen.py)
Generated from: common.xml
Note: this file has been auto-generated. DO NOT EDIT
"""
from __future__ import print_function
import array
import hashlib
import json
import os
import platform
import struct
import sys
import time
from builtins import object, range
from pymavlink.generator.mavcrc import x25crc
WIRE_PROTOCOL_VERSION = "2.0"
DIALECT = "after"
PROTOCOL_MARKER_V1 = 0xFE
PROTOCOL_MARKER_V2 = 0xFD
HEADER_LEN_V1 = 6
HEADER_LEN_V2 = 10
MAVLINK_SIGNATURE_BLOCK_LEN = 13
MAVLINK_IFLAG_SIGNED = 0x01
# Not yet supported on other dialects
native_supported = platform.system() != "Windows"
# Will force use of native code regardless of what client app wants
native_force = "MAVNATIVE_FORCE" in os.environ
# Will force both native and legacy code to be used and their results compared
native_testing = "MAVNATIVE_TESTING" in os.environ
if native_supported and float(WIRE_PROTOCOL_VERSION) <= 1:
try:
import mavnative
except ImportError:
print("ERROR LOADING MAVNATIVE - falling back to python implementation")
native_supported = False
else:
# mavnative isn't supported for MAVLink2 yet
native_supported = False
# allow MAV_IGNORE_CRC=1 to ignore CRC, allowing some
# corrupted msgs to be seen
MAVLINK_IGNORE_CRC = os.environ.get("MAV_IGNORE_CRC", 0)
# some base types from mavlink_types.h
MAVLINK_TYPE_CHAR = 0
MAVLINK_TYPE_UINT8_T = 1
MAVLINK_TYPE_INT8_T = 2
MAVLINK_TYPE_UINT16_T = 3
MAVLINK_TYPE_INT16_T = 4
MAVLINK_TYPE_UINT32_T = 5
MAVLINK_TYPE_INT32_T = 6
MAVLINK_TYPE_UINT64_T = 7
MAVLINK_TYPE_INT64_T = 8
MAVLINK_TYPE_FLOAT = 9
MAVLINK_TYPE_DOUBLE = 10
# swiped from DFReader.py
def to_string(s):
"""desperate attempt to convert a string regardless of what garbage we get"""
try:
return s.decode("utf-8")
except Exception as e:
pass
try:
s2 = s.encode("utf-8", "ignore")
x = u"%s" % s2
return s2
except Exception:
pass
# so it's a nasty one. Let's grab as many characters as we can
r = ""
try:
for c in s:
r2 = r + c
r2 = r2.encode("ascii", "ignore")
x = u"%s" % r2
r = r2
except Exception:
pass
return r + "_XXX"
class MAVLink_header(object):
"""MAVLink message header"""
def __init__(self, msgId, incompat_flags=0, compat_flags=0, mlen=0, seq=0, srcSystem=0, srcComponent=0):
self.mlen = mlen
self.seq = seq
self.srcSystem = srcSystem
self.srcComponent = srcComponent
self.msgId = msgId
self.incompat_flags = incompat_flags
self.compat_flags = compat_flags
def pack(self, force_mavlink1=False):
if WIRE_PROTOCOL_VERSION == "2.0" and not force_mavlink1:
return struct.pack(
"<BBBBBBBHB",
253,
self.mlen,
self.incompat_flags,
self.compat_flags,
self.seq,
self.srcSystem,
self.srcComponent,
self.msgId & 0xFFFF,
self.msgId >> 16,
)
return struct.pack(
"<BBBBBB",
PROTOCOL_MARKER_V1,
self.mlen,
self.seq,
self.srcSystem,
self.srcComponent,
self.msgId,
)
class MAVLink_message(object):
"""base MAVLink message class"""
def __init__(self, msgId, name):
self._header = MAVLink_header(msgId)
self._payload = None
self._msgbuf = None
self._crc = None
self._fieldnames = []
self._type = name
self._signed = False
self._link_id = None
self._instances = None
self._instance_field = None
def format_attr(self, field):
"""override field getter"""
raw_attr = getattr(self, field)
if isinstance(raw_attr, bytes):
raw_attr = to_string(raw_attr).rstrip("\00")
return raw_attr
def get_msgbuf(self):
if isinstance(self._msgbuf, bytearray):
return self._msgbuf
return bytearray(self._msgbuf)
def get_header(self):
return self._header
def get_payload(self):
return self._payload
def get_crc(self):
return self._crc
def get_fieldnames(self):
return self._fieldnames
def get_type(self):
return self._type
def get_msgId(self):
return self._header.msgId
def get_srcSystem(self):
return self._header.srcSystem
def get_srcComponent(self):
return self._header.srcComponent
def get_seq(self):
return self._header.seq
def get_signed(self):
return self._signed
def get_link_id(self):
return self._link_id
def __str__(self):
ret = "%s {" % self._type
for a in self._fieldnames:
v = self.format_attr(a)
ret += "%s : %s, " % (a, v)
ret = ret[0:-2] + "}"
return ret
def __ne__(self, other):
return not self.__eq__(other)
def __eq__(self, other):
if other is None:
return False
if self.get_type() != other.get_type():
return False
# We do not compare CRC because native code doesn't provide it
# if self.get_crc() != other.get_crc():
# return False
if self.get_seq() != other.get_seq():
return False
if self.get_srcSystem() != other.get_srcSystem():
return False
if self.get_srcComponent() != other.get_srcComponent():
return False
for a in self._fieldnames:
if self.format_attr(a) != other.format_attr(a):
return False
return True
def to_dict(self):
d = dict({})
d["mavpackettype"] = self._type
for a in self._fieldnames:
d[a] = self.format_attr(a)
return d
def to_json(self):
return json.dumps(self.to_dict())
def sign_packet(self, mav):
h = hashlib.new("sha256")
self._msgbuf += struct.pack("<BQ", mav.signing.link_id, mav.signing.timestamp)[:7]
h.update(mav.signing.secret_key)
h.update(self._msgbuf)
sig = h.digest()[:6]
self._msgbuf += sig
mav.signing.timestamp += 1
def pack(self, mav, crc_extra, payload, force_mavlink1=False):
plen = len(payload)
if WIRE_PROTOCOL_VERSION != "1.0" and not force_mavlink1:
# in MAVLink2 we can strip trailing zeros off payloads. This allows for simple
# variable length arrays and smaller packets
nullbyte = chr(0)
# in Python2, type("fred") is str but also type("fred")==bytes
if str(type(payload)) == "<class 'bytes'>":
nullbyte = 0
while plen > 1 and payload[plen - 1] == nullbyte:
plen -= 1
self._payload = payload[:plen]
incompat_flags = 0
if mav.signing.sign_outgoing:
incompat_flags |= MAVLINK_IFLAG_SIGNED
self._header = MAVLink_header(
self._header.msgId,
incompat_flags=incompat_flags,
compat_flags=0,
mlen=len(self._payload),
seq=mav.seq,
srcSystem=mav.srcSystem,
srcComponent=mav.srcComponent,
)
self._msgbuf = self._header.pack(force_mavlink1=force_mavlink1) + self._payload
crc = x25crc(self._msgbuf[1:])
if True: # using CRC extra
crc.accumulate_str(struct.pack("B", crc_extra))
self._crc = crc.crc
self._msgbuf += struct.pack("<H", self._crc)
if mav.signing.sign_outgoing and not force_mavlink1:
self.sign_packet(mav)
return self._msgbuf
def __getitem__(self, key):
"""support indexing, allowing for multi-instance sensors in one message"""
if self._instances is None:
raise IndexError()
if not key in self._instances:
raise IndexError()
return self._instances[key]
# enums
class EnumEntry(object):
def __init__(self, name, description):
self.name = name
self.description = description
self.param = {}
enums = {}
# MAV_AUTOPILOT
enums["MAV_AUTOPILOT"] = {}
MAV_AUTOPILOT_GENERIC = 0 # Generic autopilot, full support for everything
enums["MAV_AUTOPILOT"][0] = EnumEntry("MAV_AUTOPILOT_GENERIC", """Generic autopilot, full support for everything""")
MAV_AUTOPILOT_RESERVED = 1 # Reserved for future use.
enums["MAV_AUTOPILOT"][1] = EnumEntry("MAV_AUTOPILOT_RESERVED", """Reserved for future use.""")
MAV_AUTOPILOT_SLUGS = 2 # SLUGS autopilot, http://slugsuav.soe.ucsc.edu
enums["MAV_AUTOPILOT"][2] = EnumEntry("MAV_AUTOPILOT_SLUGS", """SLUGS autopilot, http://slugsuav.soe.ucsc.edu""")
MAV_AUTOPILOT_AEROB = 16 # Aerob -- http://aerob.ru
enums["MAV_AUTOPILOT"][16] = EnumEntry("MAV_AUTOPILOT_AEROB", """Aerob -- http://aerob.ru""")
MAV_AUTOPILOT_ASLUAV = 17 # ASLUAV autopilot -- http://www.asl.ethz.ch
enums["MAV_AUTOPILOT"][17] = EnumEntry("MAV_AUTOPILOT_ASLUAV", """ASLUAV autopilot -- http://www.asl.ethz.ch""")
MAV_AUTOPILOT_ENUM_END = 18
enums["MAV_AUTOPILOT"][18] = EnumEntry("MAV_AUTOPILOT_ENUM_END", """""")
# MAV_TYPE
enums["MAV_TYPE"] = {}
MAV_TYPE_GENERIC = 0 # Generic micro air vehicle.
enums["MAV_TYPE"][0] = EnumEntry("MAV_TYPE_GENERIC", """Generic micro air vehicle.""")
MAV_TYPE_FIXED_WING = 1 # Fixed wing aircraft.
enums["MAV_TYPE"][1] = EnumEntry("MAV_TYPE_FIXED_WING", """Fixed wing aircraft.""")
MAV_TYPE_QUADROTOR = 2 # Quadrotor
enums["MAV_TYPE"][2] = EnumEntry("MAV_TYPE_QUADROTOR", """Quadrotor""")
MAV_TYPE_COAXIAL = 3 # Coaxial helicopter
enums["MAV_TYPE"][3] = EnumEntry("MAV_TYPE_COAXIAL", """Coaxial helicopter""")
MAV_TYPE_HELICOPTER = 4 # Normal helicopter with tail rotor.
enums["MAV_TYPE"][4] = EnumEntry("MAV_TYPE_HELICOPTER", """Normal helicopter with tail rotor.""")
MAV_TYPE_VTOL_TILTROTOR = 21 # Tiltrotor VTOL
enums["MAV_TYPE"][21] = EnumEntry("MAV_TYPE_VTOL_TILTROTOR", """Tiltrotor VTOL""")
MAV_TYPE_VTOL_RESERVED2 = 22 # VTOL reserved 2
enums["MAV_TYPE"][22] = EnumEntry("MAV_TYPE_VTOL_RESERVED2", """VTOL reserved 2""")
MAV_TYPE_VTOL_RESERVED3 = 23 # VTOL reserved 3
enums["MAV_TYPE"][23] = EnumEntry("MAV_TYPE_VTOL_RESERVED3", """VTOL reserved 3""")
MAV_TYPE_VTOL_RESERVED4 = 24 # VTOL reserved 4
enums["MAV_TYPE"][24] = EnumEntry("MAV_TYPE_VTOL_RESERVED4", """VTOL reserved 4""")
MAV_TYPE_ENUM_END = 25
enums["MAV_TYPE"][25] = EnumEntry("MAV_TYPE_ENUM_END", """""")
# FIRMWARE_VERSION_TYPE
enums["FIRMWARE_VERSION_TYPE"] = {}
FIRMWARE_VERSION_TYPE_DEV = 0 # development release
enums["FIRMWARE_VERSION_TYPE"][0] = EnumEntry("FIRMWARE_VERSION_TYPE_DEV", """development release""")
FIRMWARE_VERSION_TYPE_ALPHA = 64 # alpha release
enums["FIRMWARE_VERSION_TYPE"][64] = EnumEntry("FIRMWARE_VERSION_TYPE_ALPHA", """alpha release""")
FIRMWARE_VERSION_TYPE_BETA = 128 # beta release
enums["FIRMWARE_VERSION_TYPE"][128] = EnumEntry("FIRMWARE_VERSION_TYPE_BETA", """beta release""")
FIRMWARE_VERSION_TYPE_RC = 192 # release candidate
enums["FIRMWARE_VERSION_TYPE"][192] = EnumEntry("FIRMWARE_VERSION_TYPE_RC", """release candidate""")
FIRMWARE_VERSION_TYPE_OFFICIAL = 255 # official stable release
enums["FIRMWARE_VERSION_TYPE"][255] = EnumEntry("FIRMWARE_VERSION_TYPE_OFFICIAL", """official stable release""")
FIRMWARE_VERSION_TYPE_ENUM_END = 256
enums["FIRMWARE_VERSION_TYPE"][256] = EnumEntry("FIRMWARE_VERSION_TYPE_ENUM_END", """""")
# MAV_CMD
enums["MAV_CMD"] = {}
MAV_CMD_NAV_WAYPOINT = 16 # Navigate to MISSION.
enums["MAV_CMD"][16] = EnumEntry("MAV_CMD_NAV_WAYPOINT", """Navigate to MISSION.""")
enums["MAV_CMD"][16].param[1] = """Hold time in decimal seconds. (ignored by fixed wing, time to stay at MISSION for rotary wing)"""
enums["MAV_CMD"][16].param[2] = """Acceptance radius in meters (if the sphere with this radius is hit, the MISSION counts as reached)"""
enums["MAV_CMD"][16].param[3] = """0 to pass through the WP, if > 0 radius in meters to pass by WP. Positive value for clockwise orbit, negative value for counter-clockwise orbit. Allows trajectory control."""
enums["MAV_CMD"][16].param[4] = """Desired yaw angle at MISSION (rotary wing)"""
enums["MAV_CMD"][16].param[5] = """Latitude"""
enums["MAV_CMD"][16].param[6] = """Longitude"""
enums["MAV_CMD"][16].param[7] = """Altitude"""
MAV_CMD_NAV_GUIDED_ENABLE = 92 # hand control over to an external controller
enums["MAV_CMD"][92] = EnumEntry("MAV_CMD_NAV_GUIDED_ENABLE", """hand control over to an external controller""")
enums["MAV_CMD"][92].param[1] = """On / Off (> 0.5f on)"""
enums["MAV_CMD"][92].param[2] = """Empty"""
enums["MAV_CMD"][92].param[3] = """Empty"""
enums["MAV_CMD"][92].param[4] = """Empty"""
enums["MAV_CMD"][92].param[5] = """Empty"""
enums["MAV_CMD"][92].param[6] = """Empty"""
enums["MAV_CMD"][92].param[7] = """Empty"""
MAV_CMD_DO_GO_AROUND = 191 # Mission command to safely abort an autonmous landing.
enums["MAV_CMD"][191] = EnumEntry("MAV_CMD_DO_GO_AROUND", """Mission command to safely abort an autonmous landing.""")
enums["MAV_CMD"][191].param[1] = """Altitude (meters)"""
enums["MAV_CMD"][191].param[2] = """Empty"""
enums["MAV_CMD"][191].param[3] = """Empty"""
enums["MAV_CMD"][191].param[4] = """Empty"""
enums["MAV_CMD"][191].param[5] = """Empty"""
enums["MAV_CMD"][191].param[6] = """Empty"""
enums["MAV_CMD"][191].param[7] = """Empty"""
MAV_CMD_ENUM_END = 192
enums["MAV_CMD"][192] = EnumEntry("MAV_CMD_ENUM_END", """""")
# message IDs
MAVLINK_MSG_ID_BAD_DATA = -1
MAVLINK_MSG_ID_UNKNOWN = -2
MAVLINK_MSG_ID_HEARTBEAT = 0
MAVLINK_MSG_ID_SYS_STATUS = 1
MAVLINK_MSG_ID_SYSTEM_TIME = 2
MAVLINK_MSG_ID_PING = 4
class MAVLink_heartbeat_message(MAVLink_message):
"""
The heartbeat message shows that a system is present and
responding. The type of the MAV and Autopilot hardware allow the
receiving system to treat further messages from this system
appropriate (e.g. by laying out the user interface based on the
autopilot).
"""
id = MAVLINK_MSG_ID_HEARTBEAT
name = "HEARTBEAT"
fieldnames = ["type", "autopilot", "base_mode", "custom_mode", "system_status", "mavlink_version"]
ordered_fieldnames = ["custom_mode", "type", "autopilot", "base_mode", "system_status", "mavlink_version"]
fieldtypes = ["uint8_t", "uint8_t", "uint8_t", "uint32_t", "uint8_t", "uint8_t"]
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = "<IBBBBB"
native_format = bytearray("<IBBBBB", "ascii")
orders = [1, 2, 3, 0, 4, 5]
lengths = [1, 1, 1, 1, 1, 1]
array_lengths = [0, 0, 0, 0, 0, 0]
crc_extra = 50
unpacker = struct.Struct("<IBBBBB")
instance_field = None
instance_offset = -1
def __init__(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version):
MAVLink_message.__init__(self, MAVLink_heartbeat_message.id, MAVLink_heartbeat_message.name)
self._fieldnames = MAVLink_heartbeat_message.fieldnames
self._instance_field = MAVLink_heartbeat_message.instance_field
self._instance_offset = MAVLink_heartbeat_message.instance_offset
self.type = type
self.autopilot = autopilot
self.base_mode = base_mode
self.custom_mode = custom_mode
self.system_status = system_status
self.mavlink_version = mavlink_version
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 50, struct.pack("<IBBBBB", self.custom_mode, self.type, self.autopilot, self.base_mode, self.system_status, self.mavlink_version), force_mavlink1=force_mavlink1)
class MAVLink_sys_status_message(MAVLink_message):
"""
The general system state. If the system is following the MAVLink
standard, the system state is mainly defined by three orthogonal
states/modes: The system mode, which is either LOCKED (motors shut
down and locked), MANUAL (system under RC control), GUIDED (system
with autonomous position control, position setpoint controlled
manually) or AUTO (system guided by path/waypoint planner). The
NAV_MODE defined the current flight state: LIFTOFF (often an open-
loop maneuver), LANDING, WAYPOINTS or VECTOR. This represents the
internal navigation state machine. The system status shows wether
the system is currently active or not and if an emergency occured.
During the CRITICAL and EMERGENCY states the MAV is still
considered to be active, but should start emergency procedures
autonomously. After a failure occured it should first move from
active to critical to allow manual intervention and then move to
emergency after a certain timeout.
"""
id = MAVLINK_MSG_ID_SYS_STATUS
name = "SYS_STATUS"
fieldnames = ["onboard_control_sensors_present", "onboard_control_sensors_enabled", "onboard_control_sensors_health", "load", "voltage_battery", "current_battery", "battery_remaining", "drop_rate_comm", "errors_comm", "errors_count1", "errors_count2", "errors_count3", "errors_count4"]
ordered_fieldnames = ["onboard_control_sensors_present", "onboard_control_sensors_enabled", "onboard_control_sensors_health", "load", "voltage_battery", "current_battery", "drop_rate_comm", "errors_comm", "errors_count1", "errors_count2", "errors_count3", "errors_count4", "battery_remaining"]
fieldtypes = ["uint32_t", "uint32_t", "uint32_t", "uint16_t", "uint16_t", "int16_t", "int8_t", "uint16_t", "uint16_t", "uint16_t", "uint16_t", "uint16_t", "uint16_t"]
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = "<IIIHHhHHHHHHb"
native_format = bytearray("<IIIHHhHHHHHHb", "ascii")
orders = [0, 1, 2, 3, 4, 5, 12, 6, 7, 8, 9, 10, 11]
lengths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
array_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
crc_extra = 124
unpacker = struct.Struct("<IIIHHhHHHHHHb")
instance_field = None
instance_offset = -1
def __init__(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4):
MAVLink_message.__init__(self, MAVLink_sys_status_message.id, MAVLink_sys_status_message.name)
self._fieldnames = MAVLink_sys_status_message.fieldnames
self._instance_field = MAVLink_sys_status_message.instance_field
self._instance_offset = MAVLink_sys_status_message.instance_offset
self.onboard_control_sensors_present = onboard_control_sensors_present
self.onboard_control_sensors_enabled = onboard_control_sensors_enabled
self.onboard_control_sensors_health = onboard_control_sensors_health
self.load = load
self.voltage_battery = voltage_battery
self.current_battery = current_battery
self.battery_remaining = battery_remaining
self.drop_rate_comm = drop_rate_comm
self.errors_comm = errors_comm
self.errors_count1 = errors_count1
self.errors_count2 = errors_count2
self.errors_count3 = errors_count3
self.errors_count4 = errors_count4
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 124, struct.pack("<IIIHHhHHHHHHb", self.onboard_control_sensors_present, self.onboard_control_sensors_enabled, self.onboard_control_sensors_health, self.load, self.voltage_battery, self.current_battery, self.drop_rate_comm, self.errors_comm, self.errors_count1, self.errors_count2, self.errors_count3, self.errors_count4, self.battery_remaining), force_mavlink1=force_mavlink1)
class MAVLink_system_time_message(MAVLink_message):
"""
The system time is the time of the master clock, typically the
computer clock of the main onboard computer.
"""
id = MAVLINK_MSG_ID_SYSTEM_TIME
name = "SYSTEM_TIME"
fieldnames = ["time_unix_usec", "time_boot_ms"]
ordered_fieldnames = ["time_unix_usec", "time_boot_ms"]
fieldtypes = ["uint64_t", "uint32_t"]
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = "<QI"
native_format = bytearray("<QI", "ascii")
orders = [0, 1]
lengths = [1, 1]
array_lengths = [0, 0]
crc_extra = 137
unpacker = struct.Struct("<QI")
instance_field = None
instance_offset = -1
def __init__(self, time_unix_usec, time_boot_ms):
MAVLink_message.__init__(self, MAVLink_system_time_message.id, MAVLink_system_time_message.name)
self._fieldnames = MAVLink_system_time_message.fieldnames
self._instance_field = MAVLink_system_time_message.instance_field
self._instance_offset = MAVLink_system_time_message.instance_offset
self.time_unix_usec = time_unix_usec
self.time_boot_ms = time_boot_ms
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 137, struct.pack("<QI", self.time_unix_usec, self.time_boot_ms), force_mavlink1=force_mavlink1)
class MAVLink_ping_message(MAVLink_message):
"""
A ping message either requesting or responding to a ping. This
allows to measure the system latencies, including serial port,
radio modem and UDP connections.
"""
id = MAVLINK_MSG_ID_PING
name = "PING"
fieldnames = ["time_usec", "seq", "target_system", "target_component"]
ordered_fieldnames = ["time_usec", "seq", "target_system", "target_component"]
fieldtypes = ["uint64_t", "uint32_t", "uint8_t", "uint8_t"]
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = "<QIBB"
native_format = bytearray("<QIBB", "ascii")
orders = [0, 1, 2, 3]
lengths = [1, 1, 1, 1]
array_lengths = [0, 0, 0, 0]
crc_extra = 237
unpacker = struct.Struct("<QIBB")
instance_field = None
instance_offset = -1
def __init__(self, time_usec, seq, target_system, target_component):
MAVLink_message.__init__(self, MAVLink_ping_message.id, MAVLink_ping_message.name)
self._fieldnames = MAVLink_ping_message.fieldnames
self._instance_field = MAVLink_ping_message.instance_field
self._instance_offset = MAVLink_ping_message.instance_offset
self.time_usec = time_usec
self.seq = seq
self.target_system = target_system
self.target_component = target_component
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 237, struct.pack("<QIBB", self.time_usec, self.seq, self.target_system, self.target_component), force_mavlink1=force_mavlink1)
mavlink_map = {
MAVLINK_MSG_ID_HEARTBEAT: MAVLink_heartbeat_message,
MAVLINK_MSG_ID_SYS_STATUS: MAVLink_sys_status_message,
MAVLINK_MSG_ID_SYSTEM_TIME: MAVLink_system_time_message,
MAVLINK_MSG_ID_PING: MAVLink_ping_message,
}
class MAVError(Exception):
"""MAVLink error class"""
def __init__(self, msg):
Exception.__init__(self, msg)
self.message = msg
class MAVString(str):
"""NUL terminated string"""
def __init__(self, s):
str.__init__(self)
def __str__(self):
i = self.find(chr(0))
if i == -1:
return self[:]
return self[0:i]
class MAVLink_bad_data(MAVLink_message):
"""
a piece of bad data in a mavlink stream
"""
def __init__(self, data, reason):
MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, "BAD_DATA")
self._fieldnames = ["data", "reason"]
self.data = data
self.reason = reason
self._msgbuf = data
self._instance_field = None
def __str__(self):
"""Override the __str__ function from MAVLink_messages because non-printable characters are common in to be the reason for this message to exist."""
return "%s {%s, data:%s}" % (self._type, self.reason, [("%x" % ord(i) if isinstance(i, str) else "%x" % i) for i in self.data])
class MAVLink_unknown(MAVLink_message):
"""
a message that we don't have in the XML used when built
"""
def __init__(self, msgid, data):
MAVLink_message.__init__(self, MAVLINK_MSG_ID_UNKNOWN, "UNKNOWN_%u" % msgid)
self._fieldnames = ["data"]
self.data = data
self._msgbuf = data
self._instance_field = None
def __str__(self):
"""Override the __str__ function from MAVLink_messages because non-printable characters are common."""
return "%s {data:%s}" % (self._type, [("%x" % ord(i) if isinstance(i, str) else "%x" % i) for i in self.data])
class MAVLinkSigning(object):
"""MAVLink signing state class"""
def __init__(self):
self.secret_key = None
self.timestamp = 0
self.link_id = 0
self.sign_outgoing = False
self.allow_unsigned_callback = None
self.stream_timestamps = {}
self.sig_count = 0
self.badsig_count = 0
self.goodsig_count = 0
self.unsigned_count = 0
self.reject_count = 0
class MAVLink(object):
"""MAVLink protocol handling class"""
def __init__(self, file, srcSystem=0, srcComponent=0, use_native=False):
self.seq = 0
self.file = file
self.srcSystem = srcSystem
self.srcComponent = srcComponent
self.callback = None
self.callback_args = None
self.callback_kwargs = None
self.send_callback = None
self.send_callback_args = None
self.send_callback_kwargs = None
self.buf = bytearray()
self.buf_index = 0
self.expected_length = HEADER_LEN_V1 + 2
self.have_prefix_error = False
self.robust_parsing = False
self.protocol_marker = 253
self.little_endian = True
self.crc_extra = True
self.sort_fields = True
self.total_packets_sent = 0
self.total_bytes_sent = 0
self.total_packets_received = 0
self.total_bytes_received = 0
self.total_receive_errors = 0
self.startup_time = time.time()
self.signing = MAVLinkSigning()
if native_supported and (use_native or native_testing or native_force):
print("NOTE: mavnative is currently beta-test code")
self.native = mavnative.NativeConnection(MAVLink_message, mavlink_map)
else:
self.native = None
if native_testing:
self.test_buf = bytearray()
self.mav20_unpacker = struct.Struct("<cBBBBBBHB")
self.mav10_unpacker = struct.Struct("<cBBBBB")
self.mav20_h3_unpacker = struct.Struct("BBB")
self.mav_csum_unpacker = struct.Struct("<H")
self.mav_sign_unpacker = struct.Struct("<IH")
def set_callback(self, callback, *args, **kwargs):
self.callback = callback
self.callback_args = args
self.callback_kwargs = kwargs
def set_send_callback(self, callback, *args, **kwargs):
self.send_callback = callback
self.send_callback_args = args
self.send_callback_kwargs = kwargs
def send(self, mavmsg, force_mavlink1=False):
"""send a MAVLink message"""
buf = mavmsg.pack(self, force_mavlink1=force_mavlink1)
self.file.write(buf)
self.seq = (self.seq + 1) % 256
self.total_packets_sent += 1
self.total_bytes_sent += len(buf)
if self.send_callback:
self.send_callback(mavmsg, *self.send_callback_args, **self.send_callback_kwargs)
def buf_len(self):
return len(self.buf) - self.buf_index
def bytes_needed(self):
"""return number of bytes needed for next parsing stage"""
if self.native:
ret = self.native.expected_length - self.buf_len()
else:
ret = self.expected_length - self.buf_len()
if ret <= 0:
return 1
return ret
def __parse_char_native(self, c):
"""this method exists only to see in profiling results"""
m = self.native.parse_chars(c)
return m
def __callbacks(self, msg):
"""this method exists only to make profiling results easier to read"""
if self.callback:
self.callback(msg, *self.callback_args, **self.callback_kwargs)
def parse_char(self, c):
"""input some data bytes, possibly returning a new message"""
self.buf.extend(c)
self.total_bytes_received += len(c)
if self.native:
if native_testing:
self.test_buf.extend(c)
m = self.__parse_char_native(self.test_buf)
m2 = self.__parse_char_legacy()
if m2 != m:
print("Native: %s\nLegacy: %s\n" % (m, m2))
raise Exception("Native vs. Legacy mismatch")
else:
m = self.__parse_char_native(self.buf)
else:
m = self.__parse_char_legacy()
if m is not None:
self.total_packets_received += 1
self.__callbacks(m)
else:
# XXX The idea here is if we've read something and there's nothing left in
# the buffer, reset it to 0 which frees the memory
if self.buf_len() == 0 and self.buf_index != 0:
self.buf = bytearray()
self.buf_index = 0
return m
def __parse_char_legacy(self):
"""input some data bytes, possibly returning a new message (uses no native code)"""
header_len = HEADER_LEN_V1
if self.buf_len() >= 1 and self.buf[self.buf_index] == PROTOCOL_MARKER_V2:
header_len = HEADER_LEN_V2
if self.buf_len() >= 1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V2:
magic = self.buf[self.buf_index]
self.buf_index += 1
if self.robust_parsing:
m = MAVLink_bad_data(bytearray([magic]), "Bad prefix")
self.expected_length = header_len + 2
self.total_receive_errors += 1
return m
if self.have_prefix_error:
return None
self.have_prefix_error = True
self.total_receive_errors += 1
raise MAVError("invalid MAVLink prefix '%s'" % magic)
self.have_prefix_error = False
if self.buf_len() >= 3:
sbuf = self.buf[self.buf_index : 3 + self.buf_index]
if sys.version_info.major < 3:
sbuf = str(sbuf)
(magic, self.expected_length, incompat_flags) = self.mav20_h3_unpacker.unpack(sbuf)
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED):
self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN
self.expected_length += header_len + 2
if self.expected_length >= (header_len + 2) and self.buf_len() >= self.expected_length:
mbuf = array.array("B", self.buf[self.buf_index : self.buf_index + self.expected_length])
self.buf_index += self.expected_length
self.expected_length = header_len + 2
if self.robust_parsing:
try:
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0:
raise MAVError("invalid incompat_flags 0x%x 0x%x %u" % (incompat_flags, magic, self.expected_length))
m = self.decode(mbuf)
except MAVError as reason:
m = MAVLink_bad_data(mbuf, reason.message)
self.total_receive_errors += 1
else:
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0:
raise MAVError("invalid incompat_flags 0x%x 0x%x %u" % (incompat_flags, magic, self.expected_length))
m = self.decode(mbuf)
return m
return None
def parse_buffer(self, s):
"""input some data bytes, possibly returning a list of new messages"""
m = self.parse_char(s)
if m is None:
return None
ret = [m]
while True:
m = self.parse_char("")
if m is None:
return ret
ret.append(m)
return ret
def check_signature(self, msgbuf, srcSystem, srcComponent):
"""check signature on incoming message"""
if isinstance(msgbuf, array.array):
try:
msgbuf = msgbuf.tostring()
except:
msgbuf = msgbuf.tobytes()
timestamp_buf = msgbuf[-12:-6]
link_id = msgbuf[-13]
(tlow, thigh) = self.mav_sign_unpacker.unpack(timestamp_buf)
timestamp = tlow + (thigh << 32)
# see if the timestamp is acceptable
stream_key = (link_id, srcSystem, srcComponent)
if stream_key in self.signing.stream_timestamps:
if timestamp <= self.signing.stream_timestamps[stream_key]:
# reject old timestamp
# print("old timestamp")
return False
else:
# a new stream has appeared. Accept the timestamp if it is at most
# one minute behind our current timestamp
if timestamp + 6000 * 1000 < self.signing.timestamp:
# print("bad new stream ", timestamp/(100.0 * 1000 * 60 * 60 * 24 * 365), self.signing.timestamp/(100.0 * 1000 * 60 * 60 * 24 * 365))
return False
self.signing.stream_timestamps[stream_key] = timestamp
# print("new stream")
h = hashlib.new("sha256")
h.update(self.signing.secret_key)
h.update(msgbuf[:-6])
if str(type(msgbuf)) == "<class 'bytes'>" or str(type(msgbuf)) == "<class 'bytearray'>":
# Python 3
sig1 = h.digest()[:6]
sig2 = msgbuf[-6:]
else:
sig1 = str(h.digest())[:6]
sig2 = str(msgbuf)[-6:]
if sig1 != sig2:
# print("sig mismatch")
return False
# the timestamp we next send with is the max of the received timestamp and
# our current timestamp
self.signing.timestamp = max(self.signing.timestamp, timestamp)
return True
def decode(self, msgbuf):
"""decode a buffer as a MAVLink message"""
# decode the header
if msgbuf[0] != PROTOCOL_MARKER_V1:
headerlen = 10
try:
magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = self.mav20_unpacker.unpack(msgbuf[:headerlen])
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink header: %s" % emsg)
msgId = msgIdlow | (msgIdhigh << 16)
mapkey = msgId
else:
headerlen = 6
try:
magic, mlen, seq, srcSystem, srcComponent, msgId = self.mav10_unpacker.unpack(msgbuf[:headerlen])
incompat_flags = 0
compat_flags = 0
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink header: %s" % emsg)
mapkey = msgId
if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0:
signature_len = MAVLINK_SIGNATURE_BLOCK_LEN
else:
signature_len = 0
if ord(magic) != PROTOCOL_MARKER_V1 and ord(magic) != PROTOCOL_MARKER_V2:
raise MAVError("invalid MAVLink prefix '%s'" % magic)
if mlen != len(msgbuf) - (headerlen + 2 + signature_len):
raise MAVError("invalid MAVLink message length. Got %u expected %u, msgId=%u headerlen=%u" % (len(msgbuf) - (headerlen + 2 + signature_len), mlen, msgId, headerlen))
if not mapkey in mavlink_map:
return MAVLink_unknown(msgId, msgbuf)
# decode the payload
type = mavlink_map[mapkey]
fmt = type.format
order_map = type.orders
len_map = type.lengths
crc_extra = type.crc_extra
# decode the checksum
try:
(crc,) = self.mav_csum_unpacker.unpack(msgbuf[-(2 + signature_len) :][:2])
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink CRC: %s" % emsg)
crcbuf = msgbuf[1 : -(2 + signature_len)]
if True:
# using CRC extra
crcbuf.append(crc_extra)
crc2 = x25crc(crcbuf)
if crc != crc2.crc and not MAVLINK_IGNORE_CRC:
raise MAVError("invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x" % (msgId, crc, crc2.crc))
sig_ok = False
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN:
self.signing.sig_count += 1
if self.signing.secret_key is not None:
accept_signature = False
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN:
sig_ok = self.check_signature(msgbuf, srcSystem, srcComponent)
accept_signature = sig_ok
if sig_ok:
self.signing.goodsig_count += 1
else:
self.signing.badsig_count += 1
if not accept_signature and self.signing.allow_unsigned_callback is not None:
accept_signature = self.signing.allow_unsigned_callback(self, msgId)
if accept_signature:
self.signing.unsigned_count += 1
else:
self.signing.reject_count += 1
elif self.signing.allow_unsigned_callback is not None:
accept_signature = self.signing.allow_unsigned_callback(self, msgId)
if accept_signature:
self.signing.unsigned_count += 1
else:
self.signing.reject_count += 1
if not accept_signature:
raise MAVError("Invalid signature")
csize = type.unpacker.size
mbuf = msgbuf[headerlen : -(2 + signature_len)]
if len(mbuf) < csize:
# zero pad to give right size
mbuf.extend([0] * (csize - len(mbuf)))
if len(mbuf) < csize:
raise MAVError("Bad message of type %s length %u needs %s" % (type, len(mbuf), csize))
mbuf = mbuf[:csize]
try:
t = type.unpacker.unpack(mbuf)
except struct.error as emsg:
raise MAVError("Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s" % (type, fmt, len(mbuf), emsg))
tlist = list(t)
# handle sorted fields
if True:
t = tlist[:]
if sum(len_map) == len(len_map):
# message has no arrays in it
for i in range(0, len(tlist)):
tlist[i] = t[order_map[i]]
else:
# message has some arrays
tlist = []
for i in range(0, len(order_map)):
order = order_map[i]
L = len_map[order]
tip = sum(len_map[:order])
field = t[tip]
if L == 1 or isinstance(field, str):
tlist.append(field)
else:
tlist.append(t[tip : (tip + L)])
# terminate any strings
for i in range(0, len(tlist)):
if type.fieldtypes[i] == "char":
if sys.version_info.major >= 3:
tlist[i] = to_string(tlist[i])
tlist[i] = str(MAVString(tlist[i]))
t = tuple(tlist)
# construct the message object
try:
m = type(*t)
except Exception as emsg:
raise MAVError("Unable to instantiate MAVLink message of type %s : %s" % (type, emsg))
m._signed = sig_ok
if m._signed:
m._link_id = msgbuf[-13]
m._msgbuf = msgbuf
m._payload = msgbuf[6 : -(2 + signature_len)]
m._crc = crc
m._header = MAVLink_header(msgId, incompat_flags, compat_flags, mlen, seq, srcSystem, srcComponent)
return m
def heartbeat_encode(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3):
"""
The heartbeat message shows that a system is present and responding.
The type of the MAV and Autopilot hardware allow the receiving
system to treat further messages from this system appropriate
(e.g. by laying out the user interface based on the
autopilot).
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t)
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t)
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t)
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t)
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t)
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t)
"""
return MAVLink_heartbeat_message(type, autopilot, base_mode, custom_mode, system_status, mavlink_version)
def heartbeat_send(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3, force_mavlink1=False):
"""
The heartbeat message shows that a system is present and responding.
The type of the MAV and Autopilot hardware allow the receiving
system to treat further messages from this system appropriate
(e.g. by laying out the user interface based on the
autopilot).
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t)
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t)
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t)
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t)
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t)
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t)
"""
return self.send(self.heartbeat_encode(type, autopilot, base_mode, custom_mode, system_status, mavlink_version), force_mavlink1=force_mavlink1)
def sys_status_encode(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4):
"""
The general system state. If the system is following the MAVLink
standard, the system state is mainly defined by three
orthogonal states/modes: The system mode, which is either
LOCKED (motors shut down and locked), MANUAL (system under RC
control), GUIDED (system with autonomous position control,
position setpoint controlled manually) or AUTO (system guided
by path/waypoint planner). The NAV_MODE defined the current
flight state: LIFTOFF (often an open-loop maneuver), LANDING,
WAYPOINTS or VECTOR. This represents the internal navigation
state machine. The system status shows wether the system is
currently active or not and if an emergency occured. During
the CRITICAL and EMERGENCY states the MAV is still considered
to be active, but should start emergency procedures
autonomously. After a failure occured it should first move
from active to critical to allow manual intervention and then
move to emergency after a certain timeout.
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t)
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t)
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t)
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t)
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_count1 : Autopilot-specific errors (type:uint16_t)
errors_count2 : Autopilot-specific errors (type:uint16_t)
errors_count3 : Autopilot-specific errors (type:uint16_t)
errors_count4 : Autopilot-specific errors (type:uint16_t)
"""
return MAVLink_sys_status_message(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4)
def sys_status_send(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4, force_mavlink1=False):
"""
The general system state. If the system is following the MAVLink
standard, the system state is mainly defined by three
orthogonal states/modes: The system mode, which is either
LOCKED (motors shut down and locked), MANUAL (system under RC
control), GUIDED (system with autonomous position control,
position setpoint controlled manually) or AUTO (system guided
by path/waypoint planner). The NAV_MODE defined the current
flight state: LIFTOFF (often an open-loop maneuver), LANDING,
WAYPOINTS or VECTOR. This represents the internal navigation
state machine. The system status shows wether the system is
currently active or not and if an emergency occured. During
the CRITICAL and EMERGENCY states the MAV is still considered
to be active, but should start emergency procedures
autonomously. After a failure occured it should first move
from active to critical to allow manual intervention and then
move to emergency after a certain timeout.
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t)
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t)
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t)
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t)
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_count1 : Autopilot-specific errors (type:uint16_t)
errors_count2 : Autopilot-specific errors (type:uint16_t)
errors_count3 : Autopilot-specific errors (type:uint16_t)
errors_count4 : Autopilot-specific errors (type:uint16_t)
"""
return self.send(self.sys_status_encode(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4), force_mavlink1=force_mavlink1)
def system_time_encode(self, time_unix_usec, time_boot_ms):
"""
The system time is the time of the master clock, typically the
computer clock of the main onboard computer.
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t)
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t)
"""
return MAVLink_system_time_message(time_unix_usec, time_boot_ms)
def system_time_send(self, time_unix_usec, time_boot_ms, force_mavlink1=False):
"""
The system time is the time of the master clock, typically the
computer clock of the main onboard computer.
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t)
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t)
"""
return self.send(self.system_time_encode(time_unix_usec, time_boot_ms), force_mavlink1=force_mavlink1)
def ping_encode(self, time_usec, seq, target_system, target_component):
"""
A ping message either requesting or responding to a ping. This allows
to measure the system latencies, including serial port, radio
modem and UDP connections.
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t)
seq : PING sequence (type:uint32_t)
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
"""
return MAVLink_ping_message(time_usec, seq, target_system, target_component)
def ping_send(self, time_usec, seq, target_system, target_component, force_mavlink1=False):
"""
A ping message either requesting or responding to a ping. This allows
to measure the system latencies, including serial port, radio
modem and UDP connections.
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t)
seq : PING sequence (type:uint32_t)
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
"""
return self.send(self.ping_encode(time_usec, seq, target_system, target_component), force_mavlink1=force_mavlink1)
'''
MAVLink protocol implementation (auto-generated by mavgen.py)
Generated from: common.xml
Note: this file has been auto-generated. DO NOT EDIT
'''
from __future__ import print_function
from builtins import range
from builtins import object
import struct, array, time, json, os, sys, platform
from pymavlink.generator.mavcrc import x25crc
import hashlib
WIRE_PROTOCOL_VERSION = '2.0'
DIALECT = 'before'
PROTOCOL_MARKER_V1 = 0xFE
PROTOCOL_MARKER_V2 = 0xFD
HEADER_LEN_V1 = 6
HEADER_LEN_V2 = 10
MAVLINK_SIGNATURE_BLOCK_LEN = 13
MAVLINK_IFLAG_SIGNED = 0x01
native_supported = platform.system() != 'Windows' # Not yet supported on other dialects
native_force = 'MAVNATIVE_FORCE' in os.environ # Will force use of native code regardless of what client app wants
native_testing = 'MAVNATIVE_TESTING' in os.environ # Will force both native and legacy code to be used and their results compared
if native_supported and float(WIRE_PROTOCOL_VERSION) <= 1:
try:
import mavnative
except ImportError:
print('ERROR LOADING MAVNATIVE - falling back to python implementation')
native_supported = False
else:
# mavnative isn't supported for MAVLink2 yet
native_supported = False
# allow MAV_IGNORE_CRC=1 to ignore CRC, allowing some
# corrupted msgs to be seen
MAVLINK_IGNORE_CRC = os.environ.get("MAV_IGNORE_CRC",0)
# some base types from mavlink_types.h
MAVLINK_TYPE_CHAR = 0
MAVLINK_TYPE_UINT8_T = 1
MAVLINK_TYPE_INT8_T = 2
MAVLINK_TYPE_UINT16_T = 3
MAVLINK_TYPE_INT16_T = 4
MAVLINK_TYPE_UINT32_T = 5
MAVLINK_TYPE_INT32_T = 6
MAVLINK_TYPE_UINT64_T = 7
MAVLINK_TYPE_INT64_T = 8
MAVLINK_TYPE_FLOAT = 9
MAVLINK_TYPE_DOUBLE = 10
# swiped from DFReader.py
def to_string(s):
'''desperate attempt to convert a string regardless of what garbage we get'''
try:
return s.decode("utf-8")
except Exception as e:
pass
try:
s2 = s.encode('utf-8', 'ignore')
x = u"%s" % s2
return s2
except Exception:
pass
# so it's a nasty one. Let's grab as many characters as we can
r = ''
try:
for c in s:
r2 = r + c
r2 = r2.encode('ascii', 'ignore')
x = u"%s" % r2
r = r2
except Exception:
pass
return r + '_XXX'
class MAVLink_header(object):
'''MAVLink message header'''
def __init__(self, msgId, incompat_flags=0, compat_flags=0, mlen=0, seq=0, srcSystem=0, srcComponent=0):
self.mlen = mlen
self.seq = seq
self.srcSystem = srcSystem
self.srcComponent = srcComponent
self.msgId = msgId
self.incompat_flags = incompat_flags
self.compat_flags = compat_flags
def pack(self, force_mavlink1=False):
if WIRE_PROTOCOL_VERSION == '2.0' and not force_mavlink1:
return struct.pack('<BBBBBBBHB', 253, self.mlen,
self.incompat_flags, self.compat_flags,
self.seq, self.srcSystem, self.srcComponent,
self.msgId&0xFFFF, self.msgId>>16)
return struct.pack('<BBBBBB', PROTOCOL_MARKER_V1, self.mlen, self.seq,
self.srcSystem, self.srcComponent, self.msgId)
class MAVLink_message(object):
'''base MAVLink message class'''
def __init__(self, msgId, name):
self._header = MAVLink_header(msgId)
self._payload = None
self._msgbuf = None
self._crc = None
self._fieldnames = []
self._type = name
self._signed = False
self._link_id = None
self._instances = None
self._instance_field = None
def format_attr(self, field):
'''override field getter'''
raw_attr = getattr(self,field)
if isinstance(raw_attr, bytes):
raw_attr = to_string(raw_attr).rstrip("\00")
return raw_attr
def get_msgbuf(self):
if isinstance(self._msgbuf, bytearray):
return self._msgbuf
return bytearray(self._msgbuf)
def get_header(self):
return self._header
def get_payload(self):
return self._payload
def get_crc(self):
return self._crc
def get_fieldnames(self):
return self._fieldnames
def get_type(self):
return self._type
def get_msgId(self):
return self._header.msgId
def get_srcSystem(self):
return self._header.srcSystem
def get_srcComponent(self):
return self._header.srcComponent
def get_seq(self):
return self._header.seq
def get_signed(self):
return self._signed
def get_link_id(self):
return self._link_id
def __str__(self):
ret = '%s {' % self._type
for a in self._fieldnames:
v = self.format_attr(a)
ret += '%s : %s, ' % (a, v)
ret = ret[0:-2] + '}'
return ret
def __ne__(self, other):
return not self.__eq__(other)
def __eq__(self, other):
if other is None:
return False
if self.get_type() != other.get_type():
return False
# We do not compare CRC because native code doesn't provide it
#if self.get_crc() != other.get_crc():
# return False
if self.get_seq() != other.get_seq():
return False
if self.get_srcSystem() != other.get_srcSystem():
return False
if self.get_srcComponent() != other.get_srcComponent():
return False
for a in self._fieldnames:
if self.format_attr(a) != other.format_attr(a):
return False
return True
def to_dict(self):
d = dict({})
d['mavpackettype'] = self._type
for a in self._fieldnames:
d[a] = self.format_attr(a)
return d
def to_json(self):
return json.dumps(self.to_dict())
def sign_packet(self, mav):
h = hashlib.new('sha256')
self._msgbuf += struct.pack('<BQ', mav.signing.link_id, mav.signing.timestamp)[:7]
h.update(mav.signing.secret_key)
h.update(self._msgbuf)
sig = h.digest()[:6]
self._msgbuf += sig
mav.signing.timestamp += 1
def pack(self, mav, crc_extra, payload, force_mavlink1=False):
plen = len(payload)
if WIRE_PROTOCOL_VERSION != '1.0' and not force_mavlink1:
# in MAVLink2 we can strip trailing zeros off payloads. This allows for simple
# variable length arrays and smaller packets
nullbyte = chr(0)
# in Python2, type("fred') is str but also type("fred")==bytes
if str(type(payload)) == "<class 'bytes'>":
nullbyte = 0
while plen > 1 and payload[plen-1] == nullbyte:
plen -= 1
self._payload = payload[:plen]
incompat_flags = 0
if mav.signing.sign_outgoing:
incompat_flags |= MAVLINK_IFLAG_SIGNED
self._header = MAVLink_header(self._header.msgId,
incompat_flags=incompat_flags, compat_flags=0,
mlen=len(self._payload), seq=mav.seq,
srcSystem=mav.srcSystem, srcComponent=mav.srcComponent)
self._msgbuf = self._header.pack(force_mavlink1=force_mavlink1) + self._payload
crc = x25crc(self._msgbuf[1:])
if True: # using CRC extra
crc.accumulate_str(struct.pack('B', crc_extra))
self._crc = crc.crc
self._msgbuf += struct.pack('<H', self._crc)
if mav.signing.sign_outgoing and not force_mavlink1:
self.sign_packet(mav)
return self._msgbuf
def __getitem__(self, key):
'''support indexing, allowing for multi-instance sensors in one message'''
if self._instances is None:
raise IndexError()
if not key in self._instances:
raise IndexError()
return self._instances[key]
# enums
class EnumEntry(object):
def __init__(self, name, description):
self.name = name
self.description = description
self.param = {}
enums = {}
# MAV_AUTOPILOT
enums['MAV_AUTOPILOT'] = {}
MAV_AUTOPILOT_GENERIC = 0 # Generic autopilot, full support for everything
enums['MAV_AUTOPILOT'][0] = EnumEntry('MAV_AUTOPILOT_GENERIC', '''Generic autopilot, full support for everything''')
MAV_AUTOPILOT_RESERVED = 1 # Reserved for future use.
enums['MAV_AUTOPILOT'][1] = EnumEntry('MAV_AUTOPILOT_RESERVED', '''Reserved for future use.''')
MAV_AUTOPILOT_SLUGS = 2 # SLUGS autopilot, http://slugsuav.soe.ucsc.edu
enums['MAV_AUTOPILOT'][2] = EnumEntry('MAV_AUTOPILOT_SLUGS', '''SLUGS autopilot, http://slugsuav.soe.ucsc.edu''')
MAV_AUTOPILOT_AEROB = 16 # Aerob -- http://aerob.ru
enums['MAV_AUTOPILOT'][16] = EnumEntry('MAV_AUTOPILOT_AEROB', '''Aerob -- http://aerob.ru''')
MAV_AUTOPILOT_ASLUAV = 17 # ASLUAV autopilot -- http://www.asl.ethz.ch
enums['MAV_AUTOPILOT'][17] = EnumEntry('MAV_AUTOPILOT_ASLUAV', '''ASLUAV autopilot -- http://www.asl.ethz.ch''')
MAV_AUTOPILOT_ENUM_END = 18 #
enums['MAV_AUTOPILOT'][18] = EnumEntry('MAV_AUTOPILOT_ENUM_END', '''''')
# MAV_TYPE
enums['MAV_TYPE'] = {}
MAV_TYPE_GENERIC = 0 # Generic micro air vehicle.
enums['MAV_TYPE'][0] = EnumEntry('MAV_TYPE_GENERIC', '''Generic micro air vehicle.''')
MAV_TYPE_FIXED_WING = 1 # Fixed wing aircraft.
enums['MAV_TYPE'][1] = EnumEntry('MAV_TYPE_FIXED_WING', '''Fixed wing aircraft.''')
MAV_TYPE_QUADROTOR = 2 # Quadrotor
enums['MAV_TYPE'][2] = EnumEntry('MAV_TYPE_QUADROTOR', '''Quadrotor''')
MAV_TYPE_COAXIAL = 3 # Coaxial helicopter
enums['MAV_TYPE'][3] = EnumEntry('MAV_TYPE_COAXIAL', '''Coaxial helicopter''')
MAV_TYPE_HELICOPTER = 4 # Normal helicopter with tail rotor.
enums['MAV_TYPE'][4] = EnumEntry('MAV_TYPE_HELICOPTER', '''Normal helicopter with tail rotor.''')
MAV_TYPE_VTOL_TILTROTOR = 21 # Tiltrotor VTOL
enums['MAV_TYPE'][21] = EnumEntry('MAV_TYPE_VTOL_TILTROTOR', '''Tiltrotor VTOL''')
MAV_TYPE_VTOL_RESERVED2 = 22 # VTOL reserved 2
enums['MAV_TYPE'][22] = EnumEntry('MAV_TYPE_VTOL_RESERVED2', '''VTOL reserved 2''')
MAV_TYPE_VTOL_RESERVED3 = 23 # VTOL reserved 3
enums['MAV_TYPE'][23] = EnumEntry('MAV_TYPE_VTOL_RESERVED3', '''VTOL reserved 3''')
MAV_TYPE_VTOL_RESERVED4 = 24 # VTOL reserved 4
enums['MAV_TYPE'][24] = EnumEntry('MAV_TYPE_VTOL_RESERVED4', '''VTOL reserved 4''')
MAV_TYPE_ENUM_END = 25 #
enums['MAV_TYPE'][25] = EnumEntry('MAV_TYPE_ENUM_END', '''''')
# FIRMWARE_VERSION_TYPE
enums['FIRMWARE_VERSION_TYPE'] = {}
FIRMWARE_VERSION_TYPE_DEV = 0 # development release
enums['FIRMWARE_VERSION_TYPE'][0] = EnumEntry('FIRMWARE_VERSION_TYPE_DEV', '''development release''')
FIRMWARE_VERSION_TYPE_ALPHA = 64 # alpha release
enums['FIRMWARE_VERSION_TYPE'][64] = EnumEntry('FIRMWARE_VERSION_TYPE_ALPHA', '''alpha release''')
FIRMWARE_VERSION_TYPE_BETA = 128 # beta release
enums['FIRMWARE_VERSION_TYPE'][128] = EnumEntry('FIRMWARE_VERSION_TYPE_BETA', '''beta release''')
FIRMWARE_VERSION_TYPE_RC = 192 # release candidate
enums['FIRMWARE_VERSION_TYPE'][192] = EnumEntry('FIRMWARE_VERSION_TYPE_RC', '''release candidate''')
FIRMWARE_VERSION_TYPE_OFFICIAL = 255 # official stable release
enums['FIRMWARE_VERSION_TYPE'][255] = EnumEntry('FIRMWARE_VERSION_TYPE_OFFICIAL', '''official stable release''')
FIRMWARE_VERSION_TYPE_ENUM_END = 256 #
enums['FIRMWARE_VERSION_TYPE'][256] = EnumEntry('FIRMWARE_VERSION_TYPE_ENUM_END', '''''')
# MAV_CMD
enums['MAV_CMD'] = {}
MAV_CMD_NAV_WAYPOINT = 16 # Navigate to MISSION.
enums['MAV_CMD'][16] = EnumEntry('MAV_CMD_NAV_WAYPOINT', '''Navigate to MISSION.''')
enums['MAV_CMD'][16].param[1] = '''Hold time in decimal seconds. (ignored by fixed wing, time to stay at MISSION for rotary wing)'''
enums['MAV_CMD'][16].param[2] = '''Acceptance radius in meters (if the sphere with this radius is hit, the MISSION counts as reached)'''
enums['MAV_CMD'][16].param[3] = '''0 to pass through the WP, if > 0 radius in meters to pass by WP. Positive value for clockwise orbit, negative value for counter-clockwise orbit. Allows trajectory control.'''
enums['MAV_CMD'][16].param[4] = '''Desired yaw angle at MISSION (rotary wing)'''
enums['MAV_CMD'][16].param[5] = '''Latitude'''
enums['MAV_CMD'][16].param[6] = '''Longitude'''
enums['MAV_CMD'][16].param[7] = '''Altitude'''
MAV_CMD_NAV_GUIDED_ENABLE = 92 # hand control over to an external controller
enums['MAV_CMD'][92] = EnumEntry('MAV_CMD_NAV_GUIDED_ENABLE', '''hand control over to an external controller''')
enums['MAV_CMD'][92].param[1] = '''On / Off (> 0.5f on)'''
enums['MAV_CMD'][92].param[2] = '''Empty'''
enums['MAV_CMD'][92].param[3] = '''Empty'''
enums['MAV_CMD'][92].param[4] = '''Empty'''
enums['MAV_CMD'][92].param[5] = '''Empty'''
enums['MAV_CMD'][92].param[6] = '''Empty'''
enums['MAV_CMD'][92].param[7] = '''Empty'''
MAV_CMD_DO_GO_AROUND = 191 # Mission command to safely abort an autonmous landing.
enums['MAV_CMD'][191] = EnumEntry('MAV_CMD_DO_GO_AROUND', '''Mission command to safely abort an autonmous landing.''')
enums['MAV_CMD'][191].param[1] = '''Altitude (meters)'''
enums['MAV_CMD'][191].param[2] = '''Empty'''
enums['MAV_CMD'][191].param[3] = '''Empty'''
enums['MAV_CMD'][191].param[4] = '''Empty'''
enums['MAV_CMD'][191].param[5] = '''Empty'''
enums['MAV_CMD'][191].param[6] = '''Empty'''
enums['MAV_CMD'][191].param[7] = '''Empty'''
MAV_CMD_ENUM_END = 192 #
enums['MAV_CMD'][192] = EnumEntry('MAV_CMD_ENUM_END', '''''')
# message IDs
MAVLINK_MSG_ID_BAD_DATA = -1
MAVLINK_MSG_ID_UNKNOWN = -2
MAVLINK_MSG_ID_HEARTBEAT = 0
MAVLINK_MSG_ID_SYS_STATUS = 1
MAVLINK_MSG_ID_SYSTEM_TIME = 2
MAVLINK_MSG_ID_PING = 4
class MAVLink_heartbeat_message(MAVLink_message):
'''
The heartbeat message shows that a system is present and
responding. The type of the MAV and Autopilot hardware allow
the receiving system to treat further messages from this
system appropriate (e.g. by laying out the user interface
based on the autopilot).
'''
id = MAVLINK_MSG_ID_HEARTBEAT
name = 'HEARTBEAT'
fieldnames = ['type', 'autopilot', 'base_mode', 'custom_mode', 'system_status', 'mavlink_version']
ordered_fieldnames = ['custom_mode', 'type', 'autopilot', 'base_mode', 'system_status', 'mavlink_version']
fieldtypes = ['uint8_t', 'uint8_t', 'uint8_t', 'uint32_t', 'uint8_t', 'uint8_t']
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = '<IBBBBB'
native_format = bytearray('<IBBBBB', 'ascii')
orders = [1, 2, 3, 0, 4, 5]
lengths = [1, 1, 1, 1, 1, 1]
array_lengths = [0, 0, 0, 0, 0, 0]
crc_extra = 50
unpacker = struct.Struct('<IBBBBB')
instance_field = None
instance_offset = -1
def __init__(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version):
MAVLink_message.__init__(self, MAVLink_heartbeat_message.id, MAVLink_heartbeat_message.name)
self._fieldnames = MAVLink_heartbeat_message.fieldnames
self._instance_field = MAVLink_heartbeat_message.instance_field
self._instance_offset = MAVLink_heartbeat_message.instance_offset
self.type = type
self.autopilot = autopilot
self.base_mode = base_mode
self.custom_mode = custom_mode
self.system_status = system_status
self.mavlink_version = mavlink_version
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 50, struct.pack('<IBBBBB', self.custom_mode, self.type, self.autopilot, self.base_mode, self.system_status, self.mavlink_version), force_mavlink1=force_mavlink1)
class MAVLink_sys_status_message(MAVLink_message):
'''
The general system state. If the system is following the
MAVLink standard, the system state is mainly defined by three
orthogonal states/modes: The system mode, which is either
LOCKED (motors shut down and locked), MANUAL (system under RC
control), GUIDED (system with autonomous position control,
position setpoint controlled manually) or AUTO (system guided
by path/waypoint planner). The NAV_MODE defined the current
flight state: LIFTOFF (often an open-loop maneuver), LANDING,
WAYPOINTS or VECTOR. This represents the internal navigation
state machine. The system status shows wether the system is
currently active or not and if an emergency occured. During
the CRITICAL and EMERGENCY states the MAV is still considered
to be active, but should start emergency procedures
autonomously. After a failure occured it should first move
from active to critical to allow manual intervention and then
move to emergency after a certain timeout.
'''
id = MAVLINK_MSG_ID_SYS_STATUS
name = 'SYS_STATUS'
fieldnames = ['onboard_control_sensors_present', 'onboard_control_sensors_enabled', 'onboard_control_sensors_health', 'load', 'voltage_battery', 'current_battery', 'battery_remaining', 'drop_rate_comm', 'errors_comm', 'errors_count1', 'errors_count2', 'errors_count3', 'errors_count4']
ordered_fieldnames = ['onboard_control_sensors_present', 'onboard_control_sensors_enabled', 'onboard_control_sensors_health', 'load', 'voltage_battery', 'current_battery', 'drop_rate_comm', 'errors_comm', 'errors_count1', 'errors_count2', 'errors_count3', 'errors_count4', 'battery_remaining']
fieldtypes = ['uint32_t', 'uint32_t', 'uint32_t', 'uint16_t', 'uint16_t', 'int16_t', 'int8_t', 'uint16_t', 'uint16_t', 'uint16_t', 'uint16_t', 'uint16_t', 'uint16_t']
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = '<IIIHHhHHHHHHb'
native_format = bytearray('<IIIHHhHHHHHHb', 'ascii')
orders = [0, 1, 2, 3, 4, 5, 12, 6, 7, 8, 9, 10, 11]
lengths = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
array_lengths = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
crc_extra = 124
unpacker = struct.Struct('<IIIHHhHHHHHHb')
instance_field = None
instance_offset = -1
def __init__(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4):
MAVLink_message.__init__(self, MAVLink_sys_status_message.id, MAVLink_sys_status_message.name)
self._fieldnames = MAVLink_sys_status_message.fieldnames
self._instance_field = MAVLink_sys_status_message.instance_field
self._instance_offset = MAVLink_sys_status_message.instance_offset
self.onboard_control_sensors_present = onboard_control_sensors_present
self.onboard_control_sensors_enabled = onboard_control_sensors_enabled
self.onboard_control_sensors_health = onboard_control_sensors_health
self.load = load
self.voltage_battery = voltage_battery
self.current_battery = current_battery
self.battery_remaining = battery_remaining
self.drop_rate_comm = drop_rate_comm
self.errors_comm = errors_comm
self.errors_count1 = errors_count1
self.errors_count2 = errors_count2
self.errors_count3 = errors_count3
self.errors_count4 = errors_count4
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 124, struct.pack('<IIIHHhHHHHHHb', self.onboard_control_sensors_present, self.onboard_control_sensors_enabled, self.onboard_control_sensors_health, self.load, self.voltage_battery, self.current_battery, self.drop_rate_comm, self.errors_comm, self.errors_count1, self.errors_count2, self.errors_count3, self.errors_count4, self.battery_remaining), force_mavlink1=force_mavlink1)
class MAVLink_system_time_message(MAVLink_message):
'''
The system time is the time of the master clock, typically the
computer clock of the main onboard computer.
'''
id = MAVLINK_MSG_ID_SYSTEM_TIME
name = 'SYSTEM_TIME'
fieldnames = ['time_unix_usec', 'time_boot_ms']
ordered_fieldnames = ['time_unix_usec', 'time_boot_ms']
fieldtypes = ['uint64_t', 'uint32_t']
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = '<QI'
native_format = bytearray('<QI', 'ascii')
orders = [0, 1]
lengths = [1, 1]
array_lengths = [0, 0]
crc_extra = 137
unpacker = struct.Struct('<QI')
instance_field = None
instance_offset = -1
def __init__(self, time_unix_usec, time_boot_ms):
MAVLink_message.__init__(self, MAVLink_system_time_message.id, MAVLink_system_time_message.name)
self._fieldnames = MAVLink_system_time_message.fieldnames
self._instance_field = MAVLink_system_time_message.instance_field
self._instance_offset = MAVLink_system_time_message.instance_offset
self.time_unix_usec = time_unix_usec
self.time_boot_ms = time_boot_ms
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 137, struct.pack('<QI', self.time_unix_usec, self.time_boot_ms), force_mavlink1=force_mavlink1)
class MAVLink_ping_message(MAVLink_message):
'''
A ping message either requesting or responding to a ping. This
allows to measure the system latencies, including serial port,
radio modem and UDP connections.
'''
id = MAVLINK_MSG_ID_PING
name = 'PING'
fieldnames = ['time_usec', 'seq', 'target_system', 'target_component']
ordered_fieldnames = ['time_usec', 'seq', 'target_system', 'target_component']
fieldtypes = ['uint64_t', 'uint32_t', 'uint8_t', 'uint8_t']
fielddisplays_by_name = {}
fieldenums_by_name = {}
fieldunits_by_name = {}
format = '<QIBB'
native_format = bytearray('<QIBB', 'ascii')
orders = [0, 1, 2, 3]
lengths = [1, 1, 1, 1]
array_lengths = [0, 0, 0, 0]
crc_extra = 237
unpacker = struct.Struct('<QIBB')
instance_field = None
instance_offset = -1
def __init__(self, time_usec, seq, target_system, target_component):
MAVLink_message.__init__(self, MAVLink_ping_message.id, MAVLink_ping_message.name)
self._fieldnames = MAVLink_ping_message.fieldnames
self._instance_field = MAVLink_ping_message.instance_field
self._instance_offset = MAVLink_ping_message.instance_offset
self.time_usec = time_usec
self.seq = seq
self.target_system = target_system
self.target_component = target_component
def pack(self, mav, force_mavlink1=False):
return MAVLink_message.pack(self, mav, 237, struct.pack('<QIBB', self.time_usec, self.seq, self.target_system, self.target_component), force_mavlink1=force_mavlink1)
mavlink_map = {
MAVLINK_MSG_ID_HEARTBEAT : MAVLink_heartbeat_message,
MAVLINK_MSG_ID_SYS_STATUS : MAVLink_sys_status_message,
MAVLINK_MSG_ID_SYSTEM_TIME : MAVLink_system_time_message,
MAVLINK_MSG_ID_PING : MAVLink_ping_message,
}
class MAVError(Exception):
'''MAVLink error class'''
def __init__(self, msg):
Exception.__init__(self, msg)
self.message = msg
class MAVString(str):
'''NUL terminated string'''
def __init__(self, s):
str.__init__(self)
def __str__(self):
i = self.find(chr(0))
if i == -1:
return self[:]
return self[0:i]
class MAVLink_bad_data(MAVLink_message):
'''
a piece of bad data in a mavlink stream
'''
def __init__(self, data, reason):
MAVLink_message.__init__(self, MAVLINK_MSG_ID_BAD_DATA, 'BAD_DATA')
self._fieldnames = ['data', 'reason']
self.data = data
self.reason = reason
self._msgbuf = data
self._instance_field = None
def __str__(self):
'''Override the __str__ function from MAVLink_messages because non-printable characters are common in to be the reason for this message to exist.'''
return '%s {%s, data:%s}' % (self._type, self.reason, [('%x' % ord(i) if isinstance(i, str) else '%x' % i) for i in self.data])
class MAVLink_unknown(MAVLink_message):
'''
a message that we don't have in the XML used when built
'''
def __init__(self, msgid, data):
MAVLink_message.__init__(self, MAVLINK_MSG_ID_UNKNOWN, 'UNKNOWN_%u' % msgid)
self._fieldnames = ['data']
self.data = data
self._msgbuf = data
self._instance_field = None
def __str__(self):
'''Override the __str__ function from MAVLink_messages because non-printable characters are common.'''
return '%s {data:%s}' % (self._type, [('%x' % ord(i) if isinstance(i, str) else '%x' % i) for i in self.data])
class MAVLinkSigning(object):
'''MAVLink signing state class'''
def __init__(self):
self.secret_key = None
self.timestamp = 0
self.link_id = 0
self.sign_outgoing = False
self.allow_unsigned_callback = None
self.stream_timestamps = {}
self.sig_count = 0
self.badsig_count = 0
self.goodsig_count = 0
self.unsigned_count = 0
self.reject_count = 0
class MAVLink(object):
'''MAVLink protocol handling class'''
def __init__(self, file, srcSystem=0, srcComponent=0, use_native=False):
self.seq = 0
self.file = file
self.srcSystem = srcSystem
self.srcComponent = srcComponent
self.callback = None
self.callback_args = None
self.callback_kwargs = None
self.send_callback = None
self.send_callback_args = None
self.send_callback_kwargs = None
self.buf = bytearray()
self.buf_index = 0
self.expected_length = HEADER_LEN_V1+2
self.have_prefix_error = False
self.robust_parsing = False
self.protocol_marker = 253
self.little_endian = True
self.crc_extra = True
self.sort_fields = True
self.total_packets_sent = 0
self.total_bytes_sent = 0
self.total_packets_received = 0
self.total_bytes_received = 0
self.total_receive_errors = 0
self.startup_time = time.time()
self.signing = MAVLinkSigning()
if native_supported and (use_native or native_testing or native_force):
print("NOTE: mavnative is currently beta-test code")
self.native = mavnative.NativeConnection(MAVLink_message, mavlink_map)
else:
self.native = None
if native_testing:
self.test_buf = bytearray()
self.mav20_unpacker = struct.Struct('<cBBBBBBHB')
self.mav10_unpacker = struct.Struct('<cBBBBB')
self.mav20_h3_unpacker = struct.Struct('BBB')
self.mav_csum_unpacker = struct.Struct('<H')
self.mav_sign_unpacker = struct.Struct('<IH')
def set_callback(self, callback, *args, **kwargs):
self.callback = callback
self.callback_args = args
self.callback_kwargs = kwargs
def set_send_callback(self, callback, *args, **kwargs):
self.send_callback = callback
self.send_callback_args = args
self.send_callback_kwargs = kwargs
def send(self, mavmsg, force_mavlink1=False):
'''send a MAVLink message'''
buf = mavmsg.pack(self, force_mavlink1=force_mavlink1)
self.file.write(buf)
self.seq = (self.seq + 1) % 256
self.total_packets_sent += 1
self.total_bytes_sent += len(buf)
if self.send_callback:
self.send_callback(mavmsg, *self.send_callback_args, **self.send_callback_kwargs)
def buf_len(self):
return len(self.buf) - self.buf_index
def bytes_needed(self):
'''return number of bytes needed for next parsing stage'''
if self.native:
ret = self.native.expected_length - self.buf_len()
else:
ret = self.expected_length - self.buf_len()
if ret <= 0:
return 1
return ret
def __parse_char_native(self, c):
'''this method exists only to see in profiling results'''
m = self.native.parse_chars(c)
return m
def __callbacks(self, msg):
'''this method exists only to make profiling results easier to read'''
if self.callback:
self.callback(msg, *self.callback_args, **self.callback_kwargs)
def parse_char(self, c):
'''input some data bytes, possibly returning a new message'''
self.buf.extend(c)
self.total_bytes_received += len(c)
if self.native:
if native_testing:
self.test_buf.extend(c)
m = self.__parse_char_native(self.test_buf)
m2 = self.__parse_char_legacy()
if m2 != m:
print("Native: %s\nLegacy: %s\n" % (m, m2))
raise Exception('Native vs. Legacy mismatch')
else:
m = self.__parse_char_native(self.buf)
else:
m = self.__parse_char_legacy()
if m is not None:
self.total_packets_received += 1
self.__callbacks(m)
else:
# XXX The idea here is if we've read something and there's nothing left in
# the buffer, reset it to 0 which frees the memory
if self.buf_len() == 0 and self.buf_index != 0:
self.buf = bytearray()
self.buf_index = 0
return m
def __parse_char_legacy(self):
'''input some data bytes, possibly returning a new message (uses no native code)'''
header_len = HEADER_LEN_V1
if self.buf_len() >= 1 and self.buf[self.buf_index] == PROTOCOL_MARKER_V2:
header_len = HEADER_LEN_V2
if self.buf_len() >= 1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V1 and self.buf[self.buf_index] != PROTOCOL_MARKER_V2:
magic = self.buf[self.buf_index]
self.buf_index += 1
if self.robust_parsing:
m = MAVLink_bad_data(bytearray([magic]), 'Bad prefix')
self.expected_length = header_len+2
self.total_receive_errors += 1
return m
if self.have_prefix_error:
return None
self.have_prefix_error = True
self.total_receive_errors += 1
raise MAVError("invalid MAVLink prefix '%s'" % magic)
self.have_prefix_error = False
if self.buf_len() >= 3:
sbuf = self.buf[self.buf_index:3+self.buf_index]
if sys.version_info.major < 3:
sbuf = str(sbuf)
(magic, self.expected_length, incompat_flags) = self.mav20_h3_unpacker.unpack(sbuf)
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & MAVLINK_IFLAG_SIGNED):
self.expected_length += MAVLINK_SIGNATURE_BLOCK_LEN
self.expected_length += header_len + 2
if self.expected_length >= (header_len+2) and self.buf_len() >= self.expected_length:
mbuf = array.array('B', self.buf[self.buf_index:self.buf_index+self.expected_length])
self.buf_index += self.expected_length
self.expected_length = header_len+2
if self.robust_parsing:
try:
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0:
raise MAVError('invalid incompat_flags 0x%x 0x%x %u' % (incompat_flags, magic, self.expected_length))
m = self.decode(mbuf)
except MAVError as reason:
m = MAVLink_bad_data(mbuf, reason.message)
self.total_receive_errors += 1
else:
if magic == PROTOCOL_MARKER_V2 and (incompat_flags & ~MAVLINK_IFLAG_SIGNED) != 0:
raise MAVError('invalid incompat_flags 0x%x 0x%x %u' % (incompat_flags, magic, self.expected_length))
m = self.decode(mbuf)
return m
return None
def parse_buffer(self, s):
'''input some data bytes, possibly returning a list of new messages'''
m = self.parse_char(s)
if m is None:
return None
ret = [m]
while True:
m = self.parse_char("")
if m is None:
return ret
ret.append(m)
return ret
def check_signature(self, msgbuf, srcSystem, srcComponent):
'''check signature on incoming message'''
if isinstance(msgbuf, array.array):
try:
msgbuf = msgbuf.tostring()
except:
msgbuf = msgbuf.tobytes()
timestamp_buf = msgbuf[-12:-6]
link_id = msgbuf[-13]
(tlow, thigh) = self.mav_sign_unpacker.unpack(timestamp_buf)
timestamp = tlow + (thigh<<32)
# see if the timestamp is acceptable
stream_key = (link_id,srcSystem,srcComponent)
if stream_key in self.signing.stream_timestamps:
if timestamp <= self.signing.stream_timestamps[stream_key]:
# reject old timestamp
# print('old timestamp')
return False
else:
# a new stream has appeared. Accept the timestamp if it is at most
# one minute behind our current timestamp
if timestamp + 6000*1000 < self.signing.timestamp:
# print('bad new stream ', timestamp/(100.0*1000*60*60*24*365), self.signing.timestamp/(100.0*1000*60*60*24*365))
return False
self.signing.stream_timestamps[stream_key] = timestamp
# print('new stream')
h = hashlib.new('sha256')
h.update(self.signing.secret_key)
h.update(msgbuf[:-6])
if str(type(msgbuf)) == "<class 'bytes'>" or str(type(msgbuf)) == "<class 'bytearray'>":
# Python 3
sig1 = h.digest()[:6]
sig2 = msgbuf[-6:]
else:
sig1 = str(h.digest())[:6]
sig2 = str(msgbuf)[-6:]
if sig1 != sig2:
# print('sig mismatch')
return False
# the timestamp we next send with is the max of the received timestamp and
# our current timestamp
self.signing.timestamp = max(self.signing.timestamp, timestamp)
return True
def decode(self, msgbuf):
'''decode a buffer as a MAVLink message'''
# decode the header
if msgbuf[0] != PROTOCOL_MARKER_V1:
headerlen = 10
try:
magic, mlen, incompat_flags, compat_flags, seq, srcSystem, srcComponent, msgIdlow, msgIdhigh = self.mav20_unpacker.unpack(msgbuf[:headerlen])
except struct.error as emsg:
raise MAVError('Unable to unpack MAVLink header: %s' % emsg)
msgId = msgIdlow | (msgIdhigh<<16)
mapkey = msgId
else:
headerlen = 6
try:
magic, mlen, seq, srcSystem, srcComponent, msgId = self.mav10_unpacker.unpack(msgbuf[:headerlen])
incompat_flags = 0
compat_flags = 0
except struct.error as emsg:
raise MAVError('Unable to unpack MAVLink header: %s' % emsg)
mapkey = msgId
if (incompat_flags & MAVLINK_IFLAG_SIGNED) != 0:
signature_len = MAVLINK_SIGNATURE_BLOCK_LEN
else:
signature_len = 0
if ord(magic) != PROTOCOL_MARKER_V1 and ord(magic) != PROTOCOL_MARKER_V2:
raise MAVError("invalid MAVLink prefix '%s'" % magic)
if mlen != len(msgbuf)-(headerlen+2+signature_len):
raise MAVError('invalid MAVLink message length. Got %u expected %u, msgId=%u headerlen=%u' % (len(msgbuf)-(headerlen+2+signature_len), mlen, msgId, headerlen))
if not mapkey in mavlink_map:
return MAVLink_unknown(msgId, msgbuf)
# decode the payload
type = mavlink_map[mapkey]
fmt = type.format
order_map = type.orders
len_map = type.lengths
crc_extra = type.crc_extra
# decode the checksum
try:
crc, = self.mav_csum_unpacker.unpack(msgbuf[-(2+signature_len):][:2])
except struct.error as emsg:
raise MAVError('Unable to unpack MAVLink CRC: %s' % emsg)
crcbuf = msgbuf[1:-(2+signature_len)]
if True: # using CRC extra
crcbuf.append(crc_extra)
crc2 = x25crc(crcbuf)
if crc != crc2.crc and not MAVLINK_IGNORE_CRC:
raise MAVError('invalid MAVLink CRC in msgID %u 0x%04x should be 0x%04x' % (msgId, crc, crc2.crc))
sig_ok = False
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN:
self.signing.sig_count += 1
if self.signing.secret_key is not None:
accept_signature = False
if signature_len == MAVLINK_SIGNATURE_BLOCK_LEN:
sig_ok = self.check_signature(msgbuf, srcSystem, srcComponent)
accept_signature = sig_ok
if sig_ok:
self.signing.goodsig_count += 1
else:
self.signing.badsig_count += 1
if not accept_signature and self.signing.allow_unsigned_callback is not None:
accept_signature = self.signing.allow_unsigned_callback(self, msgId)
if accept_signature:
self.signing.unsigned_count += 1
else:
self.signing.reject_count += 1
elif self.signing.allow_unsigned_callback is not None:
accept_signature = self.signing.allow_unsigned_callback(self, msgId)
if accept_signature:
self.signing.unsigned_count += 1
else:
self.signing.reject_count += 1
if not accept_signature:
raise MAVError('Invalid signature')
csize = type.unpacker.size
mbuf = msgbuf[headerlen:-(2+signature_len)]
if len(mbuf) < csize:
# zero pad to give right size
mbuf.extend([0]*(csize - len(mbuf)))
if len(mbuf) < csize:
raise MAVError('Bad message of type %s length %u needs %s' % (
type, len(mbuf), csize))
mbuf = mbuf[:csize]
try:
t = type.unpacker.unpack(mbuf)
except struct.error as emsg:
raise MAVError('Unable to unpack MAVLink payload type=%s fmt=%s payloadLength=%u: %s' % (
type, fmt, len(mbuf), emsg))
tlist = list(t)
# handle sorted fields
if True:
t = tlist[:]
if sum(len_map) == len(len_map):
# message has no arrays in it
for i in range(0, len(tlist)):
tlist[i] = t[order_map[i]]
else:
# message has some arrays
tlist = []
for i in range(0, len(order_map)):
order = order_map[i]
L = len_map[order]
tip = sum(len_map[:order])
field = t[tip]
if L == 1 or isinstance(field, str):
tlist.append(field)
else:
tlist.append(t[tip:(tip + L)])
# terminate any strings
for i in range(0, len(tlist)):
if type.fieldtypes[i] == 'char':
if sys.version_info.major >= 3:
tlist[i] = to_string(tlist[i])
tlist[i] = str(MAVString(tlist[i]))
t = tuple(tlist)
# construct the message object
try:
m = type(*t)
except Exception as emsg:
raise MAVError('Unable to instantiate MAVLink message of type %s : %s' % (type, emsg))
m._signed = sig_ok
if m._signed:
m._link_id = msgbuf[-13]
m._msgbuf = msgbuf
m._payload = msgbuf[6:-(2+signature_len)]
m._crc = crc
m._header = MAVLink_header(msgId, incompat_flags, compat_flags, mlen, seq, srcSystem, srcComponent)
return m
def heartbeat_encode(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3):
'''
The heartbeat message shows that a system is present and responding.
The type of the MAV and Autopilot hardware allow the
receiving system to treat further messages from this
system appropriate (e.g. by laying out the user
interface based on the autopilot).
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t)
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t)
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t)
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t)
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t)
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t)
'''
return MAVLink_heartbeat_message(type, autopilot, base_mode, custom_mode, system_status, mavlink_version)
def heartbeat_send(self, type, autopilot, base_mode, custom_mode, system_status, mavlink_version=3, force_mavlink1=False):
'''
The heartbeat message shows that a system is present and responding.
The type of the MAV and Autopilot hardware allow the
receiving system to treat further messages from this
system appropriate (e.g. by laying out the user
interface based on the autopilot).
type : Type of the MAV (quadrotor, helicopter, etc., up to 15 types, defined in MAV_TYPE ENUM) (type:uint8_t)
autopilot : Autopilot type / class. defined in MAV_AUTOPILOT ENUM (type:uint8_t)
base_mode : System mode bitfield, see MAV_MODE_FLAG ENUM in mavlink/include/mavlink_types.h (type:uint8_t)
custom_mode : A bitfield for use for autopilot-specific flags. (type:uint32_t)
system_status : System status flag, see MAV_STATE ENUM (type:uint8_t)
mavlink_version : MAVLink version, not writable by user, gets added by protocol because of magic data type: uint8_t_mavlink_version (type:uint8_t)
'''
return self.send(self.heartbeat_encode(type, autopilot, base_mode, custom_mode, system_status, mavlink_version), force_mavlink1=force_mavlink1)
def sys_status_encode(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4):
'''
The general system state. If the system is following the MAVLink
standard, the system state is mainly defined by three
orthogonal states/modes: The system mode, which is
either LOCKED (motors shut down and locked), MANUAL
(system under RC control), GUIDED (system with
autonomous position control, position setpoint
controlled manually) or AUTO (system guided by
path/waypoint planner). The NAV_MODE defined the
current flight state: LIFTOFF (often an open-loop
maneuver), LANDING, WAYPOINTS or VECTOR. This
represents the internal navigation state machine. The
system status shows wether the system is currently
active or not and if an emergency occured. During the
CRITICAL and EMERGENCY states the MAV is still
considered to be active, but should start emergency
procedures autonomously. After a failure occured it
should first move from active to critical to allow
manual intervention and then move to emergency after a
certain timeout.
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t)
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t)
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t)
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t)
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_count1 : Autopilot-specific errors (type:uint16_t)
errors_count2 : Autopilot-specific errors (type:uint16_t)
errors_count3 : Autopilot-specific errors (type:uint16_t)
errors_count4 : Autopilot-specific errors (type:uint16_t)
'''
return MAVLink_sys_status_message(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4)
def sys_status_send(self, onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4, force_mavlink1=False):
'''
The general system state. If the system is following the MAVLink
standard, the system state is mainly defined by three
orthogonal states/modes: The system mode, which is
either LOCKED (motors shut down and locked), MANUAL
(system under RC control), GUIDED (system with
autonomous position control, position setpoint
controlled manually) or AUTO (system guided by
path/waypoint planner). The NAV_MODE defined the
current flight state: LIFTOFF (often an open-loop
maneuver), LANDING, WAYPOINTS or VECTOR. This
represents the internal navigation state machine. The
system status shows wether the system is currently
active or not and if an emergency occured. During the
CRITICAL and EMERGENCY states the MAV is still
considered to be active, but should start emergency
procedures autonomously. After a failure occured it
should first move from active to critical to allow
manual intervention and then move to emergency after a
certain timeout.
onboard_control_sensors_present : Bitmask showing which onboard controllers and sensors are present. Value of 0: not present. Value of 1: present. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_enabled : Bitmask showing which onboard controllers and sensors are enabled: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
onboard_control_sensors_health : Bitmask showing which onboard controllers and sensors are operational or have an error: Value of 0: not enabled. Value of 1: enabled. Indices defined by ENUM MAV_SYS_STATUS_SENSOR (type:uint32_t)
load : Maximum usage in percent of the mainloop time, (0%: 0, 100%: 1000) should be always below 1000 (type:uint16_t)
voltage_battery : Battery voltage, in millivolts (1 = 1 millivolt) (type:uint16_t)
current_battery : Battery current, in 10*milliamperes (1 = 10 milliampere), -1: autopilot does not measure the current (type:int16_t)
battery_remaining : Remaining battery energy: (0%: 0, 100%: 100), -1: autopilot estimate the remaining battery (type:int8_t)
drop_rate_comm : Communication drops in percent, (0%: 0, 100%: 10'000), (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_comm : Communication errors (UART, I2C, SPI, CAN), dropped packets on all links (packets that were corrupted on reception on the MAV) (type:uint16_t)
errors_count1 : Autopilot-specific errors (type:uint16_t)
errors_count2 : Autopilot-specific errors (type:uint16_t)
errors_count3 : Autopilot-specific errors (type:uint16_t)
errors_count4 : Autopilot-specific errors (type:uint16_t)
'''
return self.send(self.sys_status_encode(onboard_control_sensors_present, onboard_control_sensors_enabled, onboard_control_sensors_health, load, voltage_battery, current_battery, battery_remaining, drop_rate_comm, errors_comm, errors_count1, errors_count2, errors_count3, errors_count4), force_mavlink1=force_mavlink1)
def system_time_encode(self, time_unix_usec, time_boot_ms):
'''
The system time is the time of the master clock, typically the
computer clock of the main onboard computer.
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t)
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t)
'''
return MAVLink_system_time_message(time_unix_usec, time_boot_ms)
def system_time_send(self, time_unix_usec, time_boot_ms, force_mavlink1=False):
'''
The system time is the time of the master clock, typically the
computer clock of the main onboard computer.
time_unix_usec : Timestamp of the master clock in microseconds since UNIX epoch. (type:uint64_t)
time_boot_ms : Timestamp of the component clock since boot time in milliseconds. (type:uint32_t)
'''
return self.send(self.system_time_encode(time_unix_usec, time_boot_ms), force_mavlink1=force_mavlink1)
def ping_encode(self, time_usec, seq, target_system, target_component):
'''
A ping message either requesting or responding to a ping. This allows
to measure the system latencies, including serial
port, radio modem and UDP connections.
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t)
seq : PING sequence (type:uint32_t)
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
'''
return MAVLink_ping_message(time_usec, seq, target_system, target_component)
def ping_send(self, time_usec, seq, target_system, target_component, force_mavlink1=False):
'''
A ping message either requesting or responding to a ping. This allows
to measure the system latencies, including serial
port, radio modem and UDP connections.
time_usec : Unix timestamp in microseconds or since system boot if smaller than MAVLink epoch (1.1.2009) (type:uint64_t)
seq : PING sequence (type:uint32_t)
target_system : 0: request ping from all receiving systems, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
target_component : 0: request ping from all receiving components, if greater than 0: message is a ping response and number is the system id of the requesting system (type:uint8_t)
'''
return self.send(self.ping_encode(time_usec, seq, target_system, target_component), force_mavlink1=force_mavlink1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment