Skip to content

Instantly share code, notes, and snippets.

@suraj2596
Created November 30, 2019 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save suraj2596/d14fb28f7e92fc1fc50e7e2c07b60626 to your computer and use it in GitHub Desktop.
Save suraj2596/d14fb28f7e92fc1fc50e7e2c07b60626 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python2
import time
import numpy as np
import rospy
from sensor_msgs.msg import Imu
import mavros_msgs
from mavros_msgs.msg import ActuatorControl, State, Thrust
from mavros_msgs.srv import CommandBool, SetMode
from geometry_msgs.msg import PoseStamped
from tf.transformations import euler_from_quaternion
class Drone():
def __init__(self):
rospy.init_node('test_node', anonymous=True)
# Variables
self.state = State()
# ROS services
service_timeout = 30
rospy.loginfo("waiting for ROS services")
try:
rospy.wait_for_service('mavros/cmd/arming', service_timeout)
rospy.wait_for_service('mavros/set_mode', service_timeout)
rospy.loginfo("ROS services are up")
except rospy.ROSException:
self.fail("failed to connect to services")
self.set_arming_srv = rospy.ServiceProxy('mavros/cmd/arming',CommandBool)
self.set_mode_srv = rospy.ServiceProxy('mavros/set_mode', SetMode)
# ROS Subscribers
rospy.Subscriber("/mavros/state", State, self.state_callback)
# rospy.Subscriber("/mavros/imu/data", Imu, self.imu_callback)
# ROS publishers
# self.thrust_pub = rospy.Publisher("/mavros/setpoint_attitude/thrust", Thrust, queue_size=10)
# self.actuator_pub = rospy.Publisher("/mavros/target_actuator_control", ActuatorControl, queue_size=10)
self.rate = rospy.Rate(50)
def set_arm(self, arm, timeout):
"""arm: True to arm or False to disarm, timeout(int): seconds"""
rospy.loginfo("setting FCU arm: {}".format(arm))
old_arm = self.state.armed
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
arm_set = False
for i in xrange(timeout * loop_freq):
if self.state.armed == arm:
arm_set = True
rospy.loginfo("set arm success | seconds: {0} of {1}".format(
i / loop_freq, timeout))
break
else:
try:
res = self.set_arming_srv(arm)
if not res.success:
rospy.logerr("failed to send arm command")
except rospy.ServiceException as e:
rospy.logerr(e)
# try:
rate.sleep()
# except rospy.ROSException as e:
# self.fail(e)
# self.assertTrue(arm_set, (
# "failed to set arm | new arm: {0}, old arm: {1} | timeout(seconds): {2}".
# format(arm, old_arm, timeout)))
def set_mode(self, mode, timeout):
"""mode: PX4 mode string, timeout(int): seconds"""
rospy.loginfo("setting FCU mode: {0}".format(mode))
old_mode = self.state.mode
loop_freq = 1 # Hz
rate = rospy.Rate(loop_freq)
mode_set = False
for i in xrange(timeout * loop_freq):
if self.state.mode == mode:
mode_set = True
rospy.loginfo("set mode success | seconds: {0} of {1}".format(
i / loop_freq, timeout))
break
else:
try:
res = self.set_mode_srv(0, mode) # 0 is custom mode
if not res.mode_sent:
rospy.logerr("failed to send mode command")
except rospy.ServiceException as e:
rospy.logerr(e)
# try:
rate.sleep()
# except rospy.ROSException as e:
# self.fail(e)
# self.assertTrue(mode_set, (
# "failed to set mode | new mode: {0}, old mode: {1} | timeout(seconds): {2}".
# format(mode, old_mode, timeout)))
# CALLBACKS
def state_callback(self, data):
self.state = data
print(self.state.mode)
def imu_callback(self, data):
print(data.mode)
# rospy.loginfo(rospy.get_caller_id() + "\nlinear acceleration:\nx: [{}]\ny: [{}]\nz: [{}]"
# .format(data.linear_acceleration.x, data.linear_acceleration.y, data.linear_acceleration.z))
# def command_actuator(self, a, inputs):
# a.header.stamp = rospy.Time.now()
# a.group_mix = 2 # ActuatorControl.PX4_MIX_PAYLOAD
# a.controls = inputs
if __name__ == '__main__':
drone1 = Drone()
# Wait for heartbeat
# send junky setpoints
junkmsg = Thrust()
junkmsg.thrust = 0.5
# print(junkmsg)
thrust_pub = rospy.Publisher("mavros/setpoint_attitude/thrust", Thrust, queue_size=3)
time.sleep(1)
# thrust_pub.publish(junkmsg)
for i in range(100):
time.sleep(0.05)
thrust_pub.publish(junkmsg)
# set mode to OFFBOARD
drone1.set_mode("OFFBOARD", 10)
# time.sleep(2)
# Arm the drone
# drone1.set_arm(True,5)
# time.sleep(2)
# Initialize Actuator
# drone1.actuator_control_message_1 = ActuatorControl()
# # Set desired rpm
# inputs = np.array((0.0, 0.3, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0))
# drone1.actuator_control_message_1.header.stamp = rospy.Time.now()
# drone1.actuator_control_message_1.group_mix = 3 # ActuatorControl.PX4_MIX_PAYLOAD
# drone1.actuator_control_message_1.controls = inputs
# drone1.actuator_pub.publish(drone1.actuator_control_message_1)
# # listener()
rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment