Skip to content

Instantly share code, notes, and snippets.

@AlexisTM
Created November 23, 2018 15:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlexisTM/4a825e1a3a3bbc83f8e1ebc295adeb61 to your computer and use it in GitHub Desktop.
Save AlexisTM/4a825e1a3a3bbc83f8e1ebc295adeb61 to your computer and use it in GitHub Desktop.
Mavros GCS heartbeat simulation => Have status_recv message allowing calibration.
#!/usr/bin/env python2
from __future__ import division
import os
import glob
import rospy
from pymavlink import mavutil
from threading import Thread
from mavros_msgs.msg import Mavlink, StatusText
from mavros import mavlink
class GroundControl():
def __init__(self, agent='mavros', user_status_cb=None):
self.agent = agent
self.mavlink_pub = rospy.Publisher('/mavlink/to', Mavlink, queue_size=1)
self.statustext_sub = rospy.Subscriber(agent + '/statustext/recv', StatusText, self.status_cb)
self.user_status_cb = user_status_cb
self.heartbeat_thread = Thread(target=self.send_heartbeat, args=())
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
def status_cb(self, msg):
if self.user_status_cb is not None:
self.user_status_cb(msg, self.agent)
def send_heartbeat(self):
rate = rospy.Rate(2) # Hz
self.heartbeat_msg = mavutil.mavlink.MAVLink_heartbeat_message(
mavutil.mavlink.MAV_TYPE_GCS, 0, 0, 0, 0, 0)
self.heartbeat_msg.pack(mavutil.mavlink.MAVLink('', 2, 1))
self.heartbeat_ros = mavlink.convert_to_rosmsg(self.heartbeat_msg)
while not rospy.is_shutdown():
try:
self.mavlink_pub.publish(self.heartbeat_ros)
rate.sleep()
except rospy.ROSInterruptException:
pass
def printing_callback(msg, agent):
print agent, ': ', msg
if __name__ == '__main__':
rospy.init_node('mavros_gcs', anonymous=True)
mavros_gcs = GroundControl("odroid130", user_status_cb=printing_callback)
rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment