Skip to content

Instantly share code, notes, and snippets.

@patrickelectric
Last active April 19, 2024 19:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save patrickelectric/d6264fe309d01b44d5540d75cc45cb83 to your computer and use it in GitHub Desktop.
Save patrickelectric/d6264fe309d01b44d5540d75cc45cb83 to your computer and use it in GitHub Desktop.
Example of using asyncio to run multiple mavlink components
import asyncio
from pymavlink import mavutil
PORT = 6666
CONNECTION_STRING = f'udpout:127.0.0.1:{PORT}'
CONNECTIONS = {}
def get_mavlink_connection(system_id: int, component_id: int):
def create(system_id: int, component_id: int):
return mavutil.mavlink_connection(
CONNECTION_STRING,
source_system=system_id,
source_component=component_id,
autoreconnect=True,
)
if system_id not in CONNECTIONS or component_id not in CONNECTIONS[system_id]:
CONNECTIONS.update({system_id: {component_id: create(system_id, component_id)}})
return CONNECTIONS[system_id][component_id]
async def heartbeat_loop(system_id: int, component_id: int):
mavlink_connection = get_mavlink_connection(system_id, component_id)
while True:
await asyncio.sleep(1)
print(f"{system_id}/{component_id}: sending heartbeat")
mavlink_connection.mav.heartbeat_send(
0,
mavutil.mavlink.MAV_TYPE_CAMERA,
mavutil.mavlink.MAV_AUTOPILOT_GENERIC,
0,
mavutil.mavlink.MAV_STATE_STANDBY,
3
)
async def receive_loop(system_id: int, component_id: int):
mavlink_connection = get_mavlink_connection(system_id, component_id)
while True:
await asyncio.sleep(0.01)
if incoming := mavlink_connection.recv_match():
message = incoming.to_dict()
print(f"{system_id}/{component_id}: got message: {message['mavpackettype']}")
def create_component_tasks(loop: asyncio.AbstractEventLoop, system_id: int, component_id: int) -> [asyncio.Task]:
return [
loop.create_task(heartbeat_loop(system_id, component_id)),
loop.create_task(receive_loop(system_id, component_id)),
]
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(asyncio.wait(
create_component_tasks(loop, mavutil.mavlink.MAV_COMP_ID_AUTOPILOT1, mavutil.mavlink.MAV_COMP_ID_CAMERA) +
create_component_tasks(loop, mavutil.mavlink.MAV_COMP_ID_AUTOPILOT1, mavutil.mavlink.MAV_COMP_ID_CAMERA2)
))
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment