Skip to content

Instantly share code, notes, and snippets.

@TheConstructAi
Created November 3, 2021 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TheConstructAi/c78fe54b9e4fb1b50b87f1ae22ae31b1 to your computer and use it in GitHub Desktop.
Save TheConstructAi/c78fe54b9e4fb1b50b87f1ae22ae31b1 to your computer and use it in GitHub Desktop.
import rclpy
import time
import math
import numpy as np
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from nav_msgs.msg import Odometry
from rclpy.qos import ReliabilityPolicy, QoSProfile
from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor
from project_custom_interfaces.srv import GoToLoading
from extra_exercises.quat_to_euler import euler_from_quaternion
# SOLUTION FOR THE ROS ONLINE INDUSTRIAL WORKSHOP, PROVIDED BY THECONSTRUCT
# issue, please contact with duckfrost@theconstructsim.com
class Service(Node):
def __init__(self, distance_max_obstacle= 0.2, odom_topic="/odom", scan_topic="scan", cmd_vel_topic="/robot/cmd_vel",custom_move_srv='/custom_moving'):
self._odom_topic = odom_topic
self._scan_topic = scan_topic
self._cmd_vel_topic = cmd_vel_topic
self._custom_move_srv = custom_move_srv
self.laser_forward = None
self.x_pos = None
self.y_pos = None
self.yaw = None
self._distance_max_obstacle = distance_max_obstacle
self.g1 = MutuallyExclusiveCallbackGroup()
self.g2 = MutuallyExclusiveCallbackGroup()
self.g3 = MutuallyExclusiveCallbackGroup()
super().__init__('movement_server')
rclpy.logging.set_logger_level('movement_server', rclpy.logging.LoggingSeverity.INFO)
self.publisher_ = self.create_publisher(Twist, self._cmd_vel_topic, 10)
self.laser_forward = None
self.scan_subscriber_ = self.create_subscription(
LaserScan,
self._scan_topic,
self.scan_callback,
QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT),
callback_group=self.g2)
self.odom_subscriber= self.create_subscription(
Odometry,
self._odom_topic,
self.odom_callback,
QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT),
callback_group=self.g3)
self.msg = Twist()
self.srv = self.create_service(
GoToLoading,
self._custom_move_srv,
self.CustomService_callback,
callback_group=self.g1)
self.get_logger().warn("Init DONE...")
def CustomService_callback(self, request, response):
"""
This service has to move the robot using the odometry data from the origin
position to the loading position.
"""
self.get_logger().warn("Start Service Call...")
self.check_sensors_ready()
# We move first forward , checking the laser scan to not crash
# We check the distance traveled also
self.move_forwards(5.6)
# We turn 90 degrees to the right
self.turn(90.0)
# We move again checkng scan and distance
self.move_forwards(1.5)
# We turn again to be ready to return to base
self.turn(90.0)
# We answer completed task succesfully
response.complete = True
self.get_logger().warn("END Service Call...")
# return the response parameter
return response
def check_sensors_ready(self):
ready = (self.laser_forward is not None) and (self.x_pos is not None) and (self.y_pos is not None) and (self.yaw is not None)
while not ready:
self.get_logger().error("Sensors NOT ready yet "+str(ready))
self.get_logger().error("laser_forward "+str(self.laser_forward))
self.get_logger().error("x_pos "+str(self.x_pos))
self.get_logger().error("y_pos "+str(self.y_pos))
self.get_logger().error("yaw "+str(self.yaw))
self.wait_for_sec(0.5)
ready = (self.laser_forward is not None) and (self.x_pos is not None) and (self.y_pos is not None) and (self.yaw is not None)
self.get_logger().warn("Sensors READY")
def go_forwards(self):
# We check we arent too close
if self.laser_forward < self._distance_max_obstacle:
self.msg.linear.x = 0.0
self.msg.angular.z = 0.0
self.get_logger().error("Too CLOSE TO OBSTACLE, CANT MOVE front_laser=="+str(self.laser_forward))
else:
self.msg.linear.x = 0.4
self.msg.angular.z = 0.0
self.get_logger().warn("go forwards, front_laser=="+str(self.laser_forward))
self.publisher_.publish(self.msg)
def stop(self):
self.msg.linear.x = 0.0
self.msg.angular.z = 0.0
self.publisher_.publish(self.msg)
def turn_left(self):
self.msg.linear.x = 0.0
self.msg.angular.z = 0.2
self.publisher_.publish(self.msg)
def turn_right(self):
self.msg.linear.x = 0.0
self.msg.angular.z = -0.2
self.publisher_.publish(self.msg)
def move_forwards(self, distance):
self.get_logger().info("Init...MOVE FORWARDS")
distance_travelled = 0
init_x_position = self.x_pos
init_y_position = self.y_pos
self.get_logger().info("Init...last_x_position="+str(init_x_position))
self.get_logger().info("Init...init_y_position="+str(init_y_position))
while distance_travelled < distance:
# Move and check laser
self.go_forwards()
self.wait_for_sec(wait_sec=0.1, delta=0.01)
# Check distance incement
delta_x = init_x_position - self.x_pos
delta_y = init_y_position - self.y_pos
distance_travelled = math.sqrt(pow(delta_x,2) + pow(delta_y,2))
self.get_logger().info("distance_travelled="+str(distance_travelled))
self.get_logger().info("self.x_pos="+str(self.x_pos))
self.get_logger().info("init_x_position="+str(init_x_position))
self.get_logger().info("delta_x="+str(delta_x))
self.get_logger().info("self.y_pos="+str(self.y_pos))
self.get_logger().info("init_y_position="+str(init_y_position))
self.get_logger().info("delta_y="+str(delta_y))
self.stop()
self.get_logger().info("############ REACHED DISTANCE = "+str(distance_travelled))
def turn(self, turn_angle_degrees):
turn_angle = np.radians(turn_angle_degrees)
self.get_logger().info("Init...TURN turn_angle_degrees="+str(turn_angle_degrees))
self.get_logger().info("Init...TURN turn_angle="+str(turn_angle))
angle_turned = 0
init_yaw = self.yaw
self.get_logger().info("Init...init_yaw="+str(init_yaw))
# Start moving
turn_sign = np.sign([turn_angle])[0]
if turn_sign > 0:
self.turn_right()
else:
self.turn_left()
# We have to check the abs, to be compatible with both turn right+, turn left negative
while abs(angle_turned) < abs(turn_angle):
self.wait_for_sec(wait_sec=0.1, delta=0.01)
# Check distance incement
angle_turned = init_yaw - self.yaw
self.get_logger().info("#######")
self.get_logger().info("angle_turned="+str(angle_turned))
self.get_logger().info("self.yaw="+str(self.yaw))
self.get_logger().info("#######")
self.stop()
self.get_logger().info("############ REACHED ANGLE = "+str(angle_turned))
def wait_for_sec(self, wait_sec, delta=0.1):
i = 0
while i < wait_sec :
# self.get_logger().debug("..."+str(i))
time.sleep(delta)
i += delta
def scan_callback(self, msg):
length_scan_ranges = len(msg.ranges)
front_distance_index = int(length_scan_ranges / 2)
# We get the front Section, no only one value
front_minimum = 30.0
number_of_points = 50
range_subset = msg.ranges[(front_distance_index-number_of_points):(front_distance_index+number_of_points+1)]
for value in range_subset:
if value < front_minimum:
front_minimum = value
self.laser_forward = front_minimum
self.get_logger().debug('LASER FRONT MIN VALUE: "%s"' % str(self.laser_forward))
def odom_callback(self, msg):
self.x_pos = msg.pose.pose.position.x
self.y_pos = msg.pose.pose.position.y
quaternion_obj = msg.pose.pose.orientation
roll, pitch, self.yaw = euler_from_quaternion(quaternion_obj)
self.get_logger().debug("["+str(self.x_pos)+","+str(self.y_pos)+","+str(self.yaw)+"]")
def main(args=None):
# initialize the ROS communication
rclpy.init(args=args)
try:
# declare the node constructor
service = Service()
executor = MultiThreadedExecutor(num_threads=3)
executor.add_node(service)
try:
# pause the program execution, waits for a request to kill the node (ctrl+c)
executor.spin()
finally:
executor.shutdown()
service.destroy_node()
finally:
# shutdown the ROS communication
rclpy.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment