Instantly share code, notes, and snippets.
Created
December 2, 2019 20:48
-
Save blandry/02a9f340206f331777310e3ba772ac93 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import rospy | |
from nav_msgs.msg import OccupancyGrid, MapMetaData, Path | |
from geometry_msgs.msg import Twist, Pose2D, PoseStamped | |
from std_msgs.msg import String, Bool | |
import tf | |
import numpy as np | |
from numpy import linalg | |
from utils import wrapToPi | |
from planners import AStar, compute_smoothed_traj | |
from grids import StochOccupancyGrid2D | |
import scipy.interpolate | |
import matplotlib.pyplot as plt | |
from controllers import PoseController, TrajectoryTracker, HeadingController | |
from enum import Enum | |
from dynamic_reconfigure.server import Server | |
from asl_turtlebot.cfg import NavigatorConfig | |
import smach | |
import smach_ros | |
class Navigator: | |
""" | |
This class handles point to point turtlebot motion, avoiding obstacles. | |
It is the sole node that should publish to cmd_vel | |
""" | |
def __init__(self): | |
# current state | |
self.x = 0.0 | |
self.y = 0.0 | |
self.theta = 0.0 | |
# goal state | |
self.x_g = None | |
self.y_g = None | |
self.theta_g = None | |
self.th_init = 0.0 | |
# map parameters | |
self.map_width = 0 | |
self.map_height = 0 | |
self.map_resolution = 0 | |
self.map_origin = [0,0] | |
self.map_probs = [] | |
self.occupancy = None | |
self.occupancy_updated = False | |
# plan parameters | |
self.plan_resolution = 0.1 | |
self.plan_horizon = 15 | |
# time when we started following the plan | |
self.current_plan_start_time = rospy.get_rostime() | |
self.current_plan_duration = 0 | |
self.plan_start = [0.,0.] | |
# Robot limits | |
self.v_max = 0.2 # maximum velocity | |
self.om_max = 0.4 # maximum angular velocity | |
self.v_des = 0.12 # desired cruising velocity | |
self.theta_start_thresh = 0.05 # threshold in theta to start moving forward when path-following | |
self.start_pos_thresh = 0.2 # threshold to be far enough into the plan to recompute it | |
# threshold at which navigator switches from trajectory to pose control | |
self.near_thresh = 0.2 | |
self.at_thresh = 0.02 | |
self.at_thresh_theta = 0.05 | |
# trajectory smoothing | |
self.spline_alpha = 0.15 | |
self.traj_dt = 0.1 | |
# trajectory tracking controller parameters | |
self.kpx = 0.5 | |
self.kpy = 0.5 | |
self.kdx = 1.5 | |
self.kdy = 1.5 | |
# heading controller parameters | |
self.kp_th = 2. | |
self.traj_controller = TrajectoryTracker(self.kpx, self.kpy, self.kdx, self.kdy, self.v_max, self.om_max) | |
self.pose_controller = PoseController(0., 0., 0., self.v_max, self.om_max) | |
self.heading_controller = HeadingController(self.kp_th, self.om_max) | |
self.nav_planned_path_pub = rospy.Publisher('/planned_path', Path, queue_size=10) | |
self.nav_smoothed_path_pub = rospy.Publisher('/cmd_smoothed_path', Path, queue_size=10) | |
self.nav_smoothed_path_rej_pub = rospy.Publisher('/cmd_smoothed_path_rejected', Path, queue_size=10) | |
self.nav_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10) | |
self.trans_listener = tf.TransformListener() | |
self.cfg_srv = Server(NavigatorConfig, self.dyn_cfg_callback) | |
rospy.Subscriber('/map', OccupancyGrid, self.map_callback) | |
rospy.Subscriber('/map_metadata', MapMetaData, self.map_md_callback) | |
rospy.Subscriber('/cmd_nav', Pose2D, self.cmd_nav_callback) | |
self.new_cmd_nav = False | |
self.new_map = False | |
def dyn_cfg_callback(self, config, level): | |
rospy.loginfo("Reconfigure Request: k1:{k1}, k2:{k2}, k3:{k3}".format(**config)) | |
self.pose_controller.k1 = config["k1"] | |
self.pose_controller.k2 = config["k2"] | |
self.pose_controller.k3 = config["k3"] | |
return config | |
def cmd_nav_callback(self, data): | |
""" | |
loads in goal if different from current goal, and replans | |
""" | |
if data.x != self.x_g or data.y != self.y_g or data.theta != self.theta_g: | |
self.x_g = data.x | |
self.y_g = data.y | |
self.theta_g = data.theta | |
self.new_cmd_nav = True | |
def map_md_callback(self, msg): | |
""" | |
receives maps meta data and stores it | |
""" | |
self.map_width = msg.width | |
self.map_height = msg.height | |
self.map_resolution = msg.resolution | |
self.map_origin = (msg.origin.position.x,msg.origin.position.y) | |
def map_callback(self,msg): | |
""" | |
receives new map info and updates the map | |
""" | |
self.map_probs = msg.data | |
# if we've received the map metadata and have a way to update it: | |
if self.map_width>0 and self.map_height>0 and len(self.map_probs)>0: | |
self.occupancy = StochOccupancyGrid2D(self.map_resolution, | |
self.map_width, | |
self.map_height, | |
self.map_origin[0], | |
self.map_origin[1], | |
8, | |
self.map_probs) | |
self.new_map = True | |
def near_goal(self): | |
""" | |
returns whether the robot is close enough in position to the goal to | |
start using the pose controller | |
""" | |
return linalg.norm(np.array([self.x-self.x_g, self.y-self.y_g])) < self.near_thresh | |
def at_goal(self): | |
""" | |
returns whether the robot has reached the goal position with enough | |
accuracy to return to idle state | |
""" | |
return (linalg.norm(np.array([self.x-self.x_g, self.y-self.y_g])) < self.near_thresh and abs(wrapToPi(self.theta - self.theta_g)) < self.at_thresh_theta) | |
def aligned(self): | |
""" | |
returns whether robot is aligned with starting direction of path | |
(enough to switch to tracking controller) | |
""" | |
return (abs(wrapToPi(self.theta - self.th_init)) < self.theta_start_thresh) | |
def close_to_plan_start(self): | |
return (abs(self.x - self.plan_start[0]) < self.start_pos_thresh and abs(self.y - self.plan_start[1]) < self.start_pos_thresh) | |
def snap_to_grid(self, x): | |
return (self.plan_resolution*round(x[0]/self.plan_resolution), self.plan_resolution*round(x[1]/self.plan_resolution)) | |
def publish_planned_path(self, path, publisher): | |
# publish planned plan for visualization | |
path_msg = Path() | |
path_msg.header.frame_id = 'map' | |
for state in path: | |
pose_st = PoseStamped() | |
pose_st.pose.position.x = state[0] | |
pose_st.pose.position.y = state[1] | |
pose_st.pose.orientation.w = 1 | |
pose_st.header.frame_id = 'map' | |
path_msg.poses.append(pose_st) | |
publisher.publish(path_msg) | |
def publish_smoothed_path(self, traj, publisher): | |
# publish planned plan for visualization | |
path_msg = Path() | |
path_msg.header.frame_id = 'map' | |
for i in range(traj.shape[0]): | |
pose_st = PoseStamped() | |
pose_st.pose.position.x = traj[i,0] | |
pose_st.pose.position.y = traj[i,1] | |
pose_st.pose.orientation.w = 1 | |
pose_st.header.frame_id = 'map' | |
path_msg.poses.append(pose_st) | |
publisher.publish(path_msg) | |
def publish_control(self): | |
""" | |
Runs appropriate controller depending on the mode. Assumes all controllers | |
are all properly set up / with the correct goals loaded | |
""" | |
t = self.get_current_plan_time() | |
V, om = self.traj_controller.compute_control(self.x, self.y, self.theta, t) | |
cmd_vel = Twist() | |
cmd_vel.linear.x = V | |
cmd_vel.angular.z = om | |
self.nav_vel_pub.publish(cmd_vel) | |
def get_current_plan_time(self): | |
t = (rospy.get_rostime()-self.current_plan_start_time).to_sec() | |
return max(0.0, t) # clip negative time to 0 | |
def check_inputs(userdata): | |
try: | |
(translation,rotation) = userdata.nav_out.trans_listener.lookupTransform('/map', '/base_footprint', rospy.Time(0)) | |
userdata.nav_out.x = translation[0] | |
userdata.nav_out.y = translation[1] | |
euler = tf.transformations.euler_from_quaternion(rotation) | |
userdata.nav_out.theta = euler[2] | |
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e: | |
userdata.nav_out.current_plan = [] | |
rospy.loginfo("Navigator: waiting for state info") | |
return 'error' | |
if userdata.nav_in.new_cmd_nav: | |
userdata.nav_out.new_cmd_nav = False | |
return 'new_cmd_nav' | |
elif userdata.nav_in.new_map: | |
userdata.nav_out.new_map = False | |
return 'new_map' | |
return None | |
class Waiting(smach.State): | |
def __init__(self): | |
smach.State.__init__(self, | |
outcomes=['nop', 'new_cmd_nav', 'new_map', 'error'], | |
input_keys=['nav_in', 'nav_out'], | |
output_keys=['nav_out']) | |
def execute(self, userdata): | |
rospy.loginfo('Executing state WAITING') | |
ci_outcome = check_inputs(userdata) | |
if ci_outcome: | |
return ci_outcome | |
# run the state | |
cmd_vel = Twist() | |
cmd_vel.linear.x = 0.0 | |
cmd_vel.angular.z = 0.0 | |
userdata.nav_out.nav_vel_pub.publish(cmd_vel) | |
rospy.sleep(.5) | |
return 'nop' | |
class Planning(smach.State): | |
def __init__(self): | |
smach.State.__init__(self, | |
outcomes=['success', 'failure', 'new_cmd_nav', 'new_map', 'error'], | |
input_keys=['nav_in', 'nav_out'], | |
output_keys=['nav_out']) | |
def execute(self, userdata): | |
rospy.loginfo('Executing state PLANNING') | |
ci_outcome = check_inputs(userdata) | |
if ci_outcome: | |
return ci_outcome | |
if not userdata.nav_in.occupancy: | |
rospy.loginfo("Navigator: replanning canceled, waiting for occupancy map.") | |
return 'failure' | |
# Attempt to plan a path | |
state_min = userdata.nav_out.snap_to_grid((-userdata.nav_in.plan_horizon, -userdata.nav_in.plan_horizon)) | |
state_max = userdata.nav_out.snap_to_grid((userdata.nav_in.plan_horizon, userdata.nav_in.plan_horizon)) | |
x_init = userdata.nav_out.snap_to_grid((userdata.nav_in.x, userdata.nav_in.y)) | |
userdata.nav_out.plan_start = x_init | |
x_goal = userdata.nav_out.snap_to_grid((userdata.nav_in.x_g, userdata.nav_in.y_g)) | |
problem = AStar(state_min,state_max,x_init,x_goal,userdata.nav_out.occupancy,userdata.nav_out.plan_resolution) | |
rospy.loginfo("Navigator: computing navigation plan") | |
success = problem.solve() | |
if not success: | |
rospy.loginfo("Planning failed") | |
return 'failure' | |
rospy.loginfo("Planning Succeeded") | |
planned_path = problem.path | |
# Check whether path is too short | |
if len(planned_path) < 4: | |
rospy.loginfo("Path too short to track") | |
return 'failure' | |
# Smooth and generate a trajectory | |
traj_new, t_new = compute_smoothed_traj(planned_path, userdata.nav_in.v_des, userdata.nav_in.spline_alpha, userdata.nav_in.traj_dt) | |
# Otherwise follow the new plan | |
userdata.nav_out.publish_planned_path(planned_path, userdata.nav_out.nav_planned_path_pub) | |
userdata.nav_out.publish_smoothed_path(traj_new, userdata.nav_out.nav_smoothed_path_pub) | |
userdata.nav_out.pose_controller.load_goal(userdata.nav_in.x_g, userdata.nav_in.y_g, userdata.nav_in.theta_g) | |
userdata.nav_out.traj_controller.load_traj(t_new, traj_new) | |
userdata.nav_out.current_plan_start_time = rospy.get_rostime() | |
userdata.nav_out.current_plan_duration = t_new[-1] | |
userdata.nav_out.th_init = traj_new[0,2] | |
userdata.nav_out.heading_controller.load_goal(traj_new[0,2]) | |
return 'success' | |
class Tracking(smach.State): | |
def __init__(self): | |
smach.State.__init__(self, | |
outcomes=['nop', 'done', 'plan_expired', 'new_cmd_nav', 'new_map', 'error'], | |
input_keys=['nav_in', 'nav_out'], | |
output_keys=['nav_out']) | |
def execute(self, userdata): | |
rospy.loginfo('Executing state TRACKING') | |
ci_outcome = check_inputs(userdata) | |
if ci_outcome: | |
return ci_outcome | |
userdata.nav_out.publish_control() | |
if userdata.nav_out.near_goal(): | |
userdata.nav_out.x_g = None | |
userdata.nav_out.y_g = None | |
userdata.nav_out.theta_g = None | |
return 'done' | |
elif not userdata.nav_out.close_to_plan_start(): | |
rospy.loginfo("replanning because far from start") | |
return 'plan_expired' | |
elif (rospy.get_rostime() - userdata.nav_out.current_plan_start_time).to_sec() > userdata.nav_out.current_plan_duration: | |
rospy.loginfo("replanning because out of time") | |
return 'plan_expired' | |
return 'nop' | |
def main(): | |
rospy.init_node('turtlebot_navigator_smach') | |
# Create a SMACH state machine | |
sm = smach.StateMachine(outcomes=[]) | |
nav = Navigator() | |
sm.userdata.nav = nav | |
remap = remapping={'nav_in': 'nav', | |
'nav_out': 'nav'} | |
# Open the container | |
with sm: | |
# Add states to the container | |
smach.StateMachine.add('WAITING', Waiting(), | |
transitions={'nop':'WAITING', 'new_cmd_nav':'PLANNING', 'new_map':'WAITING', 'error': 'WAITING'}, | |
remapping=remap) | |
smach.StateMachine.add('PLANNING', Planning(), | |
transitions={'success':'TRACKING', 'failure':'WAITING', 'new_cmd_nav':'PLANNING', 'new_map':'PLANNING', 'error': 'WAITING'}, | |
remapping=remap) | |
smach.StateMachine.add('TRACKING', Tracking(), | |
transitions={'nop':'TRACKING', 'done': 'WAITING', 'plan_expired':'PLANNING', 'new_cmd_nav':'PLANNING', 'new_map':'PLANNING', 'error': 'WAITING'}, | |
remapping=remap) | |
sis = smach_ros.IntrospectionServer('turtlebot_navigator_introspection', sm, '/SM_NAV') | |
sis.start() | |
# Execute SMACH plan | |
outcome = sm.execute() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment