Created
November 3, 2021 08:58
-
-
Save TheConstructAi/c78fe54b9e4fb1b50b87f1ae22ae31b1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rclpy | |
import time | |
import math | |
import numpy as np | |
from rclpy.node import Node | |
from geometry_msgs.msg import Twist | |
from sensor_msgs.msg import LaserScan | |
from nav_msgs.msg import Odometry | |
from rclpy.qos import ReliabilityPolicy, QoSProfile | |
from rclpy.callback_groups import ReentrantCallbackGroup, MutuallyExclusiveCallbackGroup | |
from rclpy.executors import MultiThreadedExecutor, SingleThreadedExecutor | |
from project_custom_interfaces.srv import GoToLoading | |
from extra_exercises.quat_to_euler import euler_from_quaternion | |
# SOLUTION FOR THE ROS ONLINE INDUSTRIAL WORKSHOP, PROVIDED BY THECONSTRUCT | |
# issue, please contact with duckfrost@theconstructsim.com | |
class Service(Node): | |
def __init__(self, distance_max_obstacle= 0.2, odom_topic="/odom", scan_topic="scan", cmd_vel_topic="/robot/cmd_vel",custom_move_srv='/custom_moving'): | |
self._odom_topic = odom_topic | |
self._scan_topic = scan_topic | |
self._cmd_vel_topic = cmd_vel_topic | |
self._custom_move_srv = custom_move_srv | |
self.laser_forward = None | |
self.x_pos = None | |
self.y_pos = None | |
self.yaw = None | |
self._distance_max_obstacle = distance_max_obstacle | |
self.g1 = MutuallyExclusiveCallbackGroup() | |
self.g2 = MutuallyExclusiveCallbackGroup() | |
self.g3 = MutuallyExclusiveCallbackGroup() | |
super().__init__('movement_server') | |
rclpy.logging.set_logger_level('movement_server', rclpy.logging.LoggingSeverity.INFO) | |
self.publisher_ = self.create_publisher(Twist, self._cmd_vel_topic, 10) | |
self.laser_forward = None | |
self.scan_subscriber_ = self.create_subscription( | |
LaserScan, | |
self._scan_topic, | |
self.scan_callback, | |
QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT), | |
callback_group=self.g2) | |
self.odom_subscriber= self.create_subscription( | |
Odometry, | |
self._odom_topic, | |
self.odom_callback, | |
QoSProfile(depth=10, reliability=ReliabilityPolicy.BEST_EFFORT), | |
callback_group=self.g3) | |
self.msg = Twist() | |
self.srv = self.create_service( | |
GoToLoading, | |
self._custom_move_srv, | |
self.CustomService_callback, | |
callback_group=self.g1) | |
self.get_logger().warn("Init DONE...") | |
def CustomService_callback(self, request, response): | |
""" | |
This service has to move the robot using the odometry data from the origin | |
position to the loading position. | |
""" | |
self.get_logger().warn("Start Service Call...") | |
self.check_sensors_ready() | |
# We move first forward , checking the laser scan to not crash | |
# We check the distance traveled also | |
self.move_forwards(5.6) | |
# We turn 90 degrees to the right | |
self.turn(90.0) | |
# We move again checkng scan and distance | |
self.move_forwards(1.5) | |
# We turn again to be ready to return to base | |
self.turn(90.0) | |
# We answer completed task succesfully | |
response.complete = True | |
self.get_logger().warn("END Service Call...") | |
# return the response parameter | |
return response | |
def check_sensors_ready(self): | |
ready = (self.laser_forward is not None) and (self.x_pos is not None) and (self.y_pos is not None) and (self.yaw is not None) | |
while not ready: | |
self.get_logger().error("Sensors NOT ready yet "+str(ready)) | |
self.get_logger().error("laser_forward "+str(self.laser_forward)) | |
self.get_logger().error("x_pos "+str(self.x_pos)) | |
self.get_logger().error("y_pos "+str(self.y_pos)) | |
self.get_logger().error("yaw "+str(self.yaw)) | |
self.wait_for_sec(0.5) | |
ready = (self.laser_forward is not None) and (self.x_pos is not None) and (self.y_pos is not None) and (self.yaw is not None) | |
self.get_logger().warn("Sensors READY") | |
def go_forwards(self): | |
# We check we arent too close | |
if self.laser_forward < self._distance_max_obstacle: | |
self.msg.linear.x = 0.0 | |
self.msg.angular.z = 0.0 | |
self.get_logger().error("Too CLOSE TO OBSTACLE, CANT MOVE front_laser=="+str(self.laser_forward)) | |
else: | |
self.msg.linear.x = 0.4 | |
self.msg.angular.z = 0.0 | |
self.get_logger().warn("go forwards, front_laser=="+str(self.laser_forward)) | |
self.publisher_.publish(self.msg) | |
def stop(self): | |
self.msg.linear.x = 0.0 | |
self.msg.angular.z = 0.0 | |
self.publisher_.publish(self.msg) | |
def turn_left(self): | |
self.msg.linear.x = 0.0 | |
self.msg.angular.z = 0.2 | |
self.publisher_.publish(self.msg) | |
def turn_right(self): | |
self.msg.linear.x = 0.0 | |
self.msg.angular.z = -0.2 | |
self.publisher_.publish(self.msg) | |
def move_forwards(self, distance): | |
self.get_logger().info("Init...MOVE FORWARDS") | |
distance_travelled = 0 | |
init_x_position = self.x_pos | |
init_y_position = self.y_pos | |
self.get_logger().info("Init...last_x_position="+str(init_x_position)) | |
self.get_logger().info("Init...init_y_position="+str(init_y_position)) | |
while distance_travelled < distance: | |
# Move and check laser | |
self.go_forwards() | |
self.wait_for_sec(wait_sec=0.1, delta=0.01) | |
# Check distance incement | |
delta_x = init_x_position - self.x_pos | |
delta_y = init_y_position - self.y_pos | |
distance_travelled = math.sqrt(pow(delta_x,2) + pow(delta_y,2)) | |
self.get_logger().info("distance_travelled="+str(distance_travelled)) | |
self.get_logger().info("self.x_pos="+str(self.x_pos)) | |
self.get_logger().info("init_x_position="+str(init_x_position)) | |
self.get_logger().info("delta_x="+str(delta_x)) | |
self.get_logger().info("self.y_pos="+str(self.y_pos)) | |
self.get_logger().info("init_y_position="+str(init_y_position)) | |
self.get_logger().info("delta_y="+str(delta_y)) | |
self.stop() | |
self.get_logger().info("############ REACHED DISTANCE = "+str(distance_travelled)) | |
def turn(self, turn_angle_degrees): | |
turn_angle = np.radians(turn_angle_degrees) | |
self.get_logger().info("Init...TURN turn_angle_degrees="+str(turn_angle_degrees)) | |
self.get_logger().info("Init...TURN turn_angle="+str(turn_angle)) | |
angle_turned = 0 | |
init_yaw = self.yaw | |
self.get_logger().info("Init...init_yaw="+str(init_yaw)) | |
# Start moving | |
turn_sign = np.sign([turn_angle])[0] | |
if turn_sign > 0: | |
self.turn_right() | |
else: | |
self.turn_left() | |
# We have to check the abs, to be compatible with both turn right+, turn left negative | |
while abs(angle_turned) < abs(turn_angle): | |
self.wait_for_sec(wait_sec=0.1, delta=0.01) | |
# Check distance incement | |
angle_turned = init_yaw - self.yaw | |
self.get_logger().info("#######") | |
self.get_logger().info("angle_turned="+str(angle_turned)) | |
self.get_logger().info("self.yaw="+str(self.yaw)) | |
self.get_logger().info("#######") | |
self.stop() | |
self.get_logger().info("############ REACHED ANGLE = "+str(angle_turned)) | |
def wait_for_sec(self, wait_sec, delta=0.1): | |
i = 0 | |
while i < wait_sec : | |
# self.get_logger().debug("..."+str(i)) | |
time.sleep(delta) | |
i += delta | |
def scan_callback(self, msg): | |
length_scan_ranges = len(msg.ranges) | |
front_distance_index = int(length_scan_ranges / 2) | |
# We get the front Section, no only one value | |
front_minimum = 30.0 | |
number_of_points = 50 | |
range_subset = msg.ranges[(front_distance_index-number_of_points):(front_distance_index+number_of_points+1)] | |
for value in range_subset: | |
if value < front_minimum: | |
front_minimum = value | |
self.laser_forward = front_minimum | |
self.get_logger().debug('LASER FRONT MIN VALUE: "%s"' % str(self.laser_forward)) | |
def odom_callback(self, msg): | |
self.x_pos = msg.pose.pose.position.x | |
self.y_pos = msg.pose.pose.position.y | |
quaternion_obj = msg.pose.pose.orientation | |
roll, pitch, self.yaw = euler_from_quaternion(quaternion_obj) | |
self.get_logger().debug("["+str(self.x_pos)+","+str(self.y_pos)+","+str(self.yaw)+"]") | |
def main(args=None): | |
# initialize the ROS communication | |
rclpy.init(args=args) | |
try: | |
# declare the node constructor | |
service = Service() | |
executor = MultiThreadedExecutor(num_threads=3) | |
executor.add_node(service) | |
try: | |
# pause the program execution, waits for a request to kill the node (ctrl+c) | |
executor.spin() | |
finally: | |
executor.shutdown() | |
service.destroy_node() | |
finally: | |
# shutdown the ROS communication | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment