Skip to content

Instantly share code, notes, and snippets.

@gbin
Last active August 16, 2023 16:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gbin/0d866e86e3abdb1953a5883ccf357866 to your computer and use it in GitHub Desktop.
Save gbin/0d866e86e3abdb1953a5883ccf357866 to your computer and use it in GitHub Desktop.
Mav router example based on rust-mavlink mods made by Skyways
mod clock;
mod ego;
mod keepalive;
mod mavrouter;
use clap::Parser;
use file_rotate::compression::Compression;
use file_rotate::suffix::AppendTimestamp;
use file_rotate::suffix::FileLimit;
use file_rotate::ContentLimit;
use file_rotate::FileRotate;
use log::{debug, info};
use mavlink::ardupilotmega::MavMessage;
use mavlink::connection::routing::MAVLinkMessageRaw;
use mavlink::MavConnection;
use mavrouter::MavRouter;
use mavrouter::ShareableMavConnection;
use simplelog::{
ColorChoice, CombinedLogger, Config, LevelFilter, SharedLogger, TermLogger, TerminalMode,
WriteLogger,
};
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread;
const MAX_ROTATING_LOG_FILES: usize = 10;
const MAX_LOG_FILE_LINES: usize = 100_000;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(default_value = "serial:/dev/skywayspixhawk:1500000")]
master: String,
#[arg(default_values_t = vec!["udpin:0.0.0.0:1234".to_string(), "udpin:0.0.0.0:1235".to_string()])]
slaves: Vec<String>,
#[arg(long, value_name = "LOG_PATH")]
log_path: Option<PathBuf>,
}
fn set_protocol_version(vehicle: &mut dyn MavConnection<MavMessage>) {
vehicle.set_protocol_version(mavlink::MavlinkVersion::V2);
}
// TODO(gbin): eventually convert that to async if we need a lot of connections.
fn spawn_reader_thread(
sender: Sender<(MAVLinkMessageRaw, ShareableMavConnection)>,
from: ShareableMavConnection,
) -> thread::JoinHandle<()> {
let from_copy = from.clone();
info!("reader thread started reading...");
thread::spawn(move || loop {
let msg = from_copy.raw_read().unwrap();
sender.send((msg, from_copy.clone())).unwrap();
})
}
const SERIAL_PREFIX: &str = "serial:";
const UDPIN_PREFIX: &str = "udpin:";
const TCPIN_PREFIX: &str = "tcpin:";
const TCPOUT_PREFIX: &str = "tcpout:";
/// Opens a raw mavlink connection.
///
/// A Raw connection allows the system to route at an L2 level without having to parse every single
/// message.
fn open_raw_connection(cnx_string: &str) -> ShareableMavConnection {
if cnx_string.starts_with(SERIAL_PREFIX) {
let right_part = &cnx_string[SERIAL_PREFIX.len()..];
debug!("Opening serial connection {right_part}.");
let mut serial_cnx = mavlink::connection::direct_serial::open(right_part)
.expect("Error opening serial connection."); // TODO(gbin): nicer error
// management
set_protocol_version(&mut serial_cnx);
ShareableMavConnection(Arc::new(serial_cnx))
} else if cnx_string.starts_with(UDPIN_PREFIX) {
let right_part = &cnx_string[UDPIN_PREFIX.len()..];
let mut udp_cnx =
mavlink::connection::udp::udpin(right_part).expect("Error opening udp connection.");
set_protocol_version(&mut udp_cnx);
ShareableMavConnection(Arc::new(udp_cnx))
} else if cnx_string.starts_with(TCPIN_PREFIX) {
let right_part = &cnx_string[TCPIN_PREFIX.len()..];
let mut tcp_cnx =
mavlink::connection::tcp::tcpin(right_part).expect("Error opening tcp connection.");
set_protocol_version(&mut tcp_cnx);
ShareableMavConnection(Arc::new(tcp_cnx))
} else if cnx_string.starts_with(TCPOUT_PREFIX) {
let right_part = &cnx_string[TCPOUT_PREFIX.len()..];
let mut tcp_cnx =
mavlink::connection::tcp::tcpout(right_part).expect("Error opening tcp connection.");
set_protocol_version(&mut tcp_cnx);
ShareableMavConnection(Arc::new(tcp_cnx))
} else {
panic!("Incorrect connection string: {cnx_string}");
}
}
/// Create a combined logger between the console and a log file.
///
/// # Arguments
///
/// * `log_path`: Optionally a log path to create a log file.
///
/// returns: ()
///
fn init_logger(log_path: &Option<PathBuf>) {
let mut loggers: Vec<Box<dyn SharedLogger>> = vec![
// Let it at Debug as we compile out the Debug level on release.
TermLogger::new(
LevelFilter::Debug,
Config::default(),
TerminalMode::Mixed,
ColorChoice::Auto,
),
];
if log_path.is_some() {
loggers.push(WriteLogger::new(
LevelFilter::Debug,
Config::default(),
FileRotate::new(
log_path.as_ref().unwrap(),
AppendTimestamp::default(FileLimit::MaxFiles(MAX_ROTATING_LOG_FILES)),
ContentLimit::Lines(MAX_LOG_FILE_LINES),
Compression::None,
None,
),
))
}
// configure the logger.
CombinedLogger::init(loggers).unwrap();
}
fn main() {
let args = Args::parse();
init_logger(&args.log_path);
info!("Smurf started with arguments: {:?}", args);
// We need to initialize the logger before we do anything else.
// Create a set of connections.
let master = open_raw_connection(&args.master);
let slaves = args
.slaves
.iter()
.map(|s| open_raw_connection(s))
.collect::<Vec<_>>();
// Create the main router.
let mut router = MavRouter::new();
// Mux all the connections
let (sender, receiver) = channel::<(MAVLinkMessageRaw, ShareableMavConnection)>();
spawn_reader_thread(sender.clone(), master);
for slave in slaves {
spawn_reader_thread(sender.clone(), slave);
}
// Feed all the incoming messages through the router.
loop {
let (mut msg, cnx) = receiver.recv().unwrap();
// Auto add participant based the incoming message system_id and component_id
let bf = std::time::Instant::now();
router.route_with_discovery(&mut msg, &cnx);
if bf.elapsed().as_millis() > 100 {
debug!("Took too long to route: {}ms", bf.elapsed().as_millis());
}
}
}
//! Mavlink Router.
//!
//! This module implements a mavlink low level routeur.
//!
//! By low level we mean that it will route the messages by parsing as less as possible.
use log::debug;
use log::error;
use mavlink::ardupilotmega::MavMessage;
use mavlink::connection::routing::MAVLinkMessageRaw;
use mavlink::connection::routing::RawConnection;
use mavlink::CommonMessageRaw;
use mavlink::Message;
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::iter::Iterator;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
// Generic Error when a user action is not coherent with the Routing table.
#[derive(Debug)]
struct RoutingError {
incorrect_action: String,
}
impl RoutingError {
fn new(incorrect_action: &str) -> Self {
Self {
incorrect_action: incorrect_action.to_string(),
}
}
}
impl Error for RoutingError {}
impl Display for RoutingError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let reason = &self.incorrect_action;
write!(f, "Rounting error: {reason}.")
}
}
/// A RawConnection that can be shared between threads.
pub struct ShareableMavConnection(pub Arc<dyn RawConnection<MavMessage> + Send + Sync>);
impl std::fmt::Debug for ShareableMavConnection {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ShareableMavConnection")
.field(&self.0.connection_id())
.finish()
}
}
impl PartialEq for ShareableMavConnection {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Deref for ShareableMavConnection {
type Target = Arc<dyn RawConnection<MavMessage> + Send + Sync>;
fn deref(&self) -> &Arc<dyn RawConnection<MavMessage> + Send + Sync> {
&self.0
}
}
impl DerefMut for ShareableMavConnection {
fn deref_mut(&mut self) -> &mut Arc<dyn RawConnection<MavMessage> + Send + Sync> {
&mut self.0
}
}
impl Clone for ShareableMavConnection {
fn clone_from(&mut self, source: &Self) {
*self = source.clone()
}
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
/// A routing table entry.
/// A GeneralBroadcast entry will be sent to all the "outs".
/// A SystemBroadcast entry will be sent to all the "outs" that match that system_id.
/// A Component entry will be only sent on the outs with the specific entry.
#[derive(Eq, PartialEq, Debug)]
pub enum MavRoutingEntry {
GeneralBroadcast,
SystemBroadcast { system_id: u8 },
Component { system_id: u8, component_id: u8 },
Nowhere,
}
use MavRoutingEntry::*;
impl Display for MavRoutingEntry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
GeneralBroadcast => write!(f, "Broadcast"),
SystemBroadcast { system_id } => write!(f, "System {system_id}"),
Component {
system_id,
component_id,
} => write!(f, "({system_id}, {component_id})"),
Nowhere => write!(f, "Nowhere"),
}
}
}
impl From<(u8, u8)> for MavRoutingEntry {
fn from(syscomp: (u8, u8)) -> Self {
let (system_id, component_id) = syscomp;
match (system_id, component_id) {
(0, _) => GeneralBroadcast,
(system_id, 0) => SystemBroadcast { system_id },
(system_id, component_id) => Component {
system_id,
component_id,
},
}
}
}
// Generates up to 3 routes per system and component id.
fn generate_routes(system_id: u8, component_id: u8) -> impl Iterator<Item = MavRoutingEntry> {
match (system_id, component_id) {
(0, _) => [GeneralBroadcast, Nowhere, Nowhere].into_iter(),
(system_id, 0) => [GeneralBroadcast, SystemBroadcast { system_id }, Nowhere].into_iter(),
(system_id, component_id) => [
GeneralBroadcast,
SystemBroadcast { system_id },
Component {
system_id,
component_id,
},
]
.into_iter(),
}
}
/// This structure represents a routing entry with its associated connection.
/// It is used to keep track of the Systems & Components that are connected to the router through a specific connection.
struct RoutingConnectionPair {
entry: MavRoutingEntry,
cnx: ShareableMavConnection,
}
impl RoutingConnectionPair {
fn new(entry: MavRoutingEntry, cnx: ShareableMavConnection) -> Self {
RoutingConnectionPair { entry, cnx }
}
}
impl Display for RoutingConnectionPair {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.entry)
}
}
impl PartialEq for RoutingConnectionPair {
fn eq(&self, other: &Self) -> bool {
self.entry == other.entry && self.cnx == other.cnx
}
}
/// This is the main structure of the router, it keeps a list of all the connections and their associated routing entries.
pub struct MavRouter {
routing_table: Vec<RoutingConnectionPair>,
}
impl MavRouter {
pub fn new() -> Self {
MavRouter {
routing_table: Vec::<RoutingConnectionPair>::new(),
}
}
fn is_route_already_present<'a>(
&self,
entry: &MavRoutingEntry,
cnx: &ShareableMavConnection,
) -> bool {
self.routing_table
.iter()
.any(move |pair| pair.entry == *entry && pair.cnx == *cnx)
}
/// asks the router to keep that of a specific participant over a link.
pub fn add_participant(
&mut self,
system_id: u8,
component_id: u8,
cnx: &ShareableMavConnection,
) {
for route in generate_routes(system_id, component_id) {
if route == Nowhere {
continue;
};
if !self.is_route_already_present(&route, cnx) {
debug!("Adding new route {:?} for {:?}", route, cnx);
self.routing_table
.push(RoutingConnectionPair::new(route, cnx.clone()));
}
}
}
/// This is a main function of the router, it takes a message and a connection and sends the message to all the connections that match the routing entry.
pub fn route(&self, msg: &mut MAVLinkMessageRaw, origin: &ShareableMavConnection) -> usize {
let mut total_bytes_sent = 0usize;
let msg_id = match msg {
MAVLinkMessageRaw::V1(msg) => msg.message_id(),
MAVLinkMessageRaw::V2(msg) => msg.message_id(),
};
let (target_system_id_offset, target_component_id_offset) =
MavMessage::target_offsets_from_id(msg_id);
let target_system_component_ids =
match (target_system_id_offset, target_component_id_offset) {
(None, None) => (0, 0),
(None, Some(_)) => todo!(), // should not happen.
(Some(target_system_id_offset), None) => {
// Mavlink v2 protocol assumes trailing zeros for anything over the communicated
// length.
if target_system_id_offset >= msg.payload_length().into() {
(0, 0)
} else {
let target_system_id = msg.payload()[target_system_id_offset];
(target_system_id, 0)
}
}
(Some(target_system_id_offset), Some(target_component_id_offset)) => {
// Mavlink v2 protocol assumes trailing zeros for anything over the communicated
// length.
let mut target_system_id = 0;
let mut target_component_id = 0;
if target_system_id_offset < msg.payload_length().into() {
target_system_id = msg.payload()[target_system_id_offset];
}
if target_component_id_offset < msg.payload_length().into() {
target_component_id = msg.payload()[target_component_id_offset];
}
(target_system_id, target_component_id)
}
};
let dest = MavRoutingEntry::from(target_system_component_ids);
for route in self.routing_table.iter() {
if route.entry == dest && route.cnx != *origin {
let Ok(size) = route.cnx.raw_write(msg) else {
error!("Write error on the connections for {route}");
continue;
};
total_bytes_sent += size;
}
}
total_bytes_sent
}
/// This is the main function of the router that do both the routing and keeps track of any new
/// participant showing up over a link.
pub fn route_with_discovery(
&mut self,
msg: &mut MAVLinkMessageRaw,
origin: &ShareableMavConnection,
) {
// Auto add participant based the incoming message system_id and component_id
self.add_participant(msg.system_id(), msg.component_id(), &origin);
self.route(msg, origin);
}
}
#[cfg(test)]
mod tests {
use crate::mavrouter::MavRouter;
use crate::mavrouter::MavRoutingEntry;
use crate::mavrouter::RawConnection;
use crate::mavrouter::RoutingConnectionPair;
use crate::mavrouter::ShareableMavConnection;
use mavlink::ardupilotmega::MavMessage;
use mavlink::connection::routing::MAVLinkMessageRaw;
use mavlink::CommonMessageRaw;
use mavlink::MAVLinkV2MessageRaw;
use mavlink::MavHeader;
use mockall::mock;
use mockall::predicate::*;
use std::sync::Arc;
mock! {
pub InnerConnectionRef {} // This generates automagically MockConnection.
impl RawConnection<MavMessage> for InnerConnectionRef {
fn raw_write(&self, raw_msg: &mut MAVLinkMessageRaw) -> std::io::Result<usize>;
fn raw_read(&self) -> std::io::Result <MAVLinkMessageRaw>;
fn connection_id(&self) -> String {
"mock".to_string()
}
}
}
pub fn request_ping(from_system_id: u8, from_component_id: u8) -> MavMessage {
MavMessage::PING(mavlink::ardupilotmega::PING_DATA {
time_usec: 123456,
seq: 12,
target_system: from_system_id,
target_component: from_component_id,
})
}
pub fn request_parameters_system() -> MavMessage {
MavMessage::PARAM_REQUEST_LIST(mavlink::ardupilotmega::PARAM_REQUEST_LIST_DATA {
target_system: 5,
target_component: 0,
})
}
pub fn request_parameters_component() -> MavMessage {
MavMessage::PARAM_REQUEST_LIST(mavlink::ardupilotmega::PARAM_REQUEST_LIST_DATA {
target_system: 3,
target_component: 14,
})
}
#[test]
fn full_broadcast() {
// define a test message we want to see routed.
let mut raw_msg1 = MAVLinkV2MessageRaw::new();
let test_msg = request_ping(0, 0);
let hdr = MavHeader {
system_id: 2,
component_id: 3,
sequence: 12,
};
raw_msg1.serialize_message::<MavMessage>(hdr, &test_msg);
let l = raw_msg1.len();
let master = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
// ^^ This assures it will NOT Broadcast back to master as this is this origin.
let mut inner_slave1_mock = MockInnerConnectionRef::new();
inner_slave1_mock
.expect_raw_write()
.once()
.withf(move |msg| raw_msg1.full() == msg.full())
.returning(move |_| std::io::Result::Ok(l));
let slave1 = ShareableMavConnection(Arc::new(inner_slave1_mock));
let mut inner_slave2_mock = MockInnerConnectionRef::new();
inner_slave2_mock
.expect_raw_write()
.once()
.withf(move |msg| raw_msg1.full() == msg.full())
.returning(move |_| std::io::Result::Ok(l));
let slave2 = ShareableMavConnection(Arc::new(inner_slave2_mock));
// Setup our router and participants
let mut router = MavRouter::new();
router.add_participant(1, 1, &master); // should route to
router.add_participant(3, 14, &slave1); // should route to
router.add_participant(3, 14, &slave2); // should route to
router.route(&mut MAVLinkMessageRaw::V2(raw_msg1), &master);
}
#[test]
fn simple_point_to_point() {
// define a test message we want to see routed.
let mut raw_msg1 = MAVLinkV2MessageRaw::new();
let test_msg = request_parameters_component();
let hdr = MavHeader {
system_id: 1,
component_id: 1,
sequence: 12,
};
raw_msg1.serialize_message::<MavMessage>(hdr, &test_msg);
let l = raw_msg1.len();
let master = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
let mut inner_slave1_mock = MockInnerConnectionRef::new();
inner_slave1_mock
.expect_raw_write()
.once()
.withf(move |msg| raw_msg1.full() == msg.full())
.returning(move |_| std::io::Result::Ok(l));
let slave1 = ShareableMavConnection(Arc::new(inner_slave1_mock));
let slave2 = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
// Setup our router and participants
let mut router = MavRouter::new();
router.add_participant(1, 1, &master); // should not route to
router.add_participant(3, 14, &slave1); // should route to
router.add_participant(4, 14, &slave2); // should not route to
router.route(&mut MAVLinkMessageRaw::V2(raw_msg1), &master);
}
#[test]
fn simple_component() {
// define a test message we want to see routed.
let mut raw_msg1 = MAVLinkV2MessageRaw::new();
let test_msg = request_parameters_system();
let hdr = MavHeader {
system_id: 1,
component_id: 1,
sequence: 12,
};
raw_msg1.serialize_message::<MavMessage>(hdr, &test_msg);
let l = raw_msg1.len();
let master = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
let mut inner_slave1_mock = MockInnerConnectionRef::new();
inner_slave1_mock
.expect_raw_write()
.once()
.withf(move |msg| raw_msg1.full() == msg.full())
.returning(move |_| std::io::Result::Ok(l));
let slave1 = ShareableMavConnection(Arc::new(inner_slave1_mock));
let mut inner_slave2_mock = MockInnerConnectionRef::new();
inner_slave2_mock
.expect_raw_write()
.once()
.withf(move |msg| raw_msg1.full() == msg.full())
.returning(move |_| std::io::Result::Ok(l));
let slave2 = ShareableMavConnection(Arc::new(inner_slave2_mock));
let slave3 = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
// Setup our router and participants
let mut router = MavRouter::new();
router.add_participant(1, 1, &master);
router.add_participant(5, 14, &slave1); // should route to
router.add_participant(5, 13, &slave2); // should route to too
router.add_participant(9, 14, &slave3); // should not route to
router.route(&mut MAVLinkMessageRaw::V2(raw_msg1), &master);
}
#[test]
fn test_discovery_empty() {
// define a test message we want to see routed.
let mut raw_msg1 = MAVLinkV2MessageRaw::new();
let test_msg = request_parameters_component(); // this is targeted for 3,14
let hdr = MavHeader {
system_id: 1,
component_id: 1,
sequence: 12,
};
raw_msg1.serialize_message::<MavMessage>(hdr, &test_msg);
let mut router = MavRouter::new();
// We should start with an empty routing table.
assert_eq!(router.routing_table.len(), 0);
let wannabe_master = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
// This will make the system realize that 1,1 is on wannabe_master
router.route_with_discovery(&mut MAVLinkMessageRaw::V2(raw_msg1), &wannabe_master);
assert_eq!(router.routing_table.len(), 3);
assert!(router.routing_table.contains(&RoutingConnectionPair {
entry: MavRoutingEntry::GeneralBroadcast,
cnx: wannabe_master.clone()
}));
assert!(router.routing_table.contains(&RoutingConnectionPair {
entry: MavRoutingEntry::SystemBroadcast { system_id: 1 },
cnx: wannabe_master.clone()
}));
assert!(router.routing_table.contains(&RoutingConnectionPair {
entry: MavRoutingEntry::Component {
system_id: 1,
component_id: 1
},
cnx: wannabe_master.clone()
}));
}
#[test]
fn test_discovery_end_to_end() {
// define a test message we want to see routed.
let mut from_master_to_slave = MAVLinkV2MessageRaw::new();
let from_master_to_slave_msg = request_ping(3, 14);
let from_master_to_slave_hdr = MavHeader {
system_id: 1,
component_id: 1,
sequence: 1,
};
from_master_to_slave
.serialize_message::<MavMessage>(from_master_to_slave_hdr, &from_master_to_slave_msg);
let mut from_slave_to_master = MAVLinkV2MessageRaw::new();
let from_slave_to_master_msg = request_ping(1, 1);
let from_slave_to_master_hdr = MavHeader {
system_id: 3,
component_id: 14,
sequence: 1,
};
from_slave_to_master
.serialize_message::<MavMessage>(from_slave_to_master_hdr, &from_slave_to_master_msg);
let l = from_slave_to_master.len();
let mut router = MavRouter::new();
let mut inner_master_mock = MockInnerConnectionRef::new();
inner_master_mock
.expect_raw_write()
.once()
.withf(move |msg| msg.full() == from_slave_to_master.full())
.returning(move |_| std::io::Result::Ok(l));
let wannabe_master = ShareableMavConnection(Arc::new(inner_master_mock));
let wannabe_slave = ShareableMavConnection(Arc::new(MockInnerConnectionRef::new()));
// simulate the first ping from master to a slave going into the ether (no slave was
// discovered yet)
router.route_with_discovery(
&mut MAVLinkMessageRaw::V2(from_master_to_slave),
&wannabe_master,
);
// simulate the first ping from slave to a master, should trigger the routing
router.route_with_discovery(
&mut MAVLinkMessageRaw::V2(from_slave_to_master),
&wannabe_slave,
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment