Skip to content

Instantly share code, notes, and snippets.

@annesteenbeek
Last active February 16, 2022 17:20
Show Gist options
  • Save annesteenbeek/5370f62cf85bb9d6825327bff1b85293 to your computer and use it in GitHub Desktop.
Save annesteenbeek/5370f62cf85bb9d6825327bff1b85293 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import rospy
import mavros
from geometry_msgs.msg import PoseStamped
from mavros.msg import State
from mavros.srv import CommandBool, SetMode
# callback method for state sub
current_state = State()
offb_set_mode = SetMode
def state_cb(state):
global current_state
current_state = state
local_pos_pub = rospy.Publisher(mavros.get_topic('setpoint_position', 'local'), PoseStamped, queue_size=10)
state_sub = rospy.Subscriber(mavros.get_topic('state'), State, state_cb)
arming_client = rospy.ServiceProxy(mavros.get_topic('cmd', 'arming'), CommandBool)
set_mode_client = rospy.ServiceProxy(mavros.get_topic('set_mode'), SetMode)
pose = PoseStamped()
pose.pose.position.x = 0
pose.pose.position.y = 0
pose.pose.position.z = 2
def position_control():
rospy.init_node('offb_node', anonymous=True)
prev_state = current_state
rate = rospy.Rate(20.0) # MUST be more then 2Hz
# send a few setpoints before starting
for i in range(100):
local_pos_pub.publish(pose)
rate.sleep()
# wait for FCU connection
while not current_state.connected:
rate.sleep()
last_request = rospy.get_rostime()
while not rospy.is_shutdown():
now = rospy.get_rostime()
if current_state.mode != "OFFBOARD" and (now - last_request > rospy.Duration(5.)):
set_mode_client(base_mode=0, custom_mode="OFFBOARD")
last_request = now
else:
if not current_state.armed and (now - last_request > rospy.Duration(5.)):
arming_client(True)
last_request = now
# older versions of PX4 always return success==True, so better to check Status instead
if prev_state.armed != current_state.armed:
rospy.loginfo("Vehicle armed: %r" % current_state.armed)
if prev_state.mode != current_state.mode:
rospy.loginfo("Current mode: %s" % current_state.mode)
prev_state = current_state
# Update timestamp and publish pose
pose.header.stamp = rospy.Time.now()
local_pos_pub.publish(pose)
rate.sleep()
if __name__ == '__main__':
try:
position_control()
except rospy.ROSInterruptException:
pass
@Wellington-Froelich
Copy link

Hi @annesteenbeek I am also curious about adding a trajectory into your code, would that be done in the while loop?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment