Skip to content

Instantly share code, notes, and snippets.

@blandry
Created December 2, 2019 20:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blandry/02a9f340206f331777310e3ba772ac93 to your computer and use it in GitHub Desktop.
Save blandry/02a9f340206f331777310e3ba772ac93 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import rospy
from nav_msgs.msg import OccupancyGrid, MapMetaData, Path
from geometry_msgs.msg import Twist, Pose2D, PoseStamped
from std_msgs.msg import String, Bool
import tf
import numpy as np
from numpy import linalg
from utils import wrapToPi
from planners import AStar, compute_smoothed_traj
from grids import StochOccupancyGrid2D
import scipy.interpolate
import matplotlib.pyplot as plt
from controllers import PoseController, TrajectoryTracker, HeadingController
from enum import Enum
from dynamic_reconfigure.server import Server
from asl_turtlebot.cfg import NavigatorConfig
import smach
import smach_ros
class Navigator:
"""
This class handles point to point turtlebot motion, avoiding obstacles.
It is the sole node that should publish to cmd_vel
"""
def __init__(self):
# current state
self.x = 0.0
self.y = 0.0
self.theta = 0.0
# goal state
self.x_g = None
self.y_g = None
self.theta_g = None
self.th_init = 0.0
# map parameters
self.map_width = 0
self.map_height = 0
self.map_resolution = 0
self.map_origin = [0,0]
self.map_probs = []
self.occupancy = None
self.occupancy_updated = False
# plan parameters
self.plan_resolution = 0.1
self.plan_horizon = 15
# time when we started following the plan
self.current_plan_start_time = rospy.get_rostime()
self.current_plan_duration = 0
self.plan_start = [0.,0.]
# Robot limits
self.v_max = 0.2 # maximum velocity
self.om_max = 0.4 # maximum angular velocity
self.v_des = 0.12 # desired cruising velocity
self.theta_start_thresh = 0.05 # threshold in theta to start moving forward when path-following
self.start_pos_thresh = 0.2 # threshold to be far enough into the plan to recompute it
# threshold at which navigator switches from trajectory to pose control
self.near_thresh = 0.2
self.at_thresh = 0.02
self.at_thresh_theta = 0.05
# trajectory smoothing
self.spline_alpha = 0.15
self.traj_dt = 0.1
# trajectory tracking controller parameters
self.kpx = 0.5
self.kpy = 0.5
self.kdx = 1.5
self.kdy = 1.5
# heading controller parameters
self.kp_th = 2.
self.traj_controller = TrajectoryTracker(self.kpx, self.kpy, self.kdx, self.kdy, self.v_max, self.om_max)
self.pose_controller = PoseController(0., 0., 0., self.v_max, self.om_max)
self.heading_controller = HeadingController(self.kp_th, self.om_max)
self.nav_planned_path_pub = rospy.Publisher('/planned_path', Path, queue_size=10)
self.nav_smoothed_path_pub = rospy.Publisher('/cmd_smoothed_path', Path, queue_size=10)
self.nav_smoothed_path_rej_pub = rospy.Publisher('/cmd_smoothed_path_rejected', Path, queue_size=10)
self.nav_vel_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=10)
self.trans_listener = tf.TransformListener()
self.cfg_srv = Server(NavigatorConfig, self.dyn_cfg_callback)
rospy.Subscriber('/map', OccupancyGrid, self.map_callback)
rospy.Subscriber('/map_metadata', MapMetaData, self.map_md_callback)
rospy.Subscriber('/cmd_nav', Pose2D, self.cmd_nav_callback)
self.new_cmd_nav = False
self.new_map = False
def dyn_cfg_callback(self, config, level):
rospy.loginfo("Reconfigure Request: k1:{k1}, k2:{k2}, k3:{k3}".format(**config))
self.pose_controller.k1 = config["k1"]
self.pose_controller.k2 = config["k2"]
self.pose_controller.k3 = config["k3"]
return config
def cmd_nav_callback(self, data):
"""
loads in goal if different from current goal, and replans
"""
if data.x != self.x_g or data.y != self.y_g or data.theta != self.theta_g:
self.x_g = data.x
self.y_g = data.y
self.theta_g = data.theta
self.new_cmd_nav = True
def map_md_callback(self, msg):
"""
receives maps meta data and stores it
"""
self.map_width = msg.width
self.map_height = msg.height
self.map_resolution = msg.resolution
self.map_origin = (msg.origin.position.x,msg.origin.position.y)
def map_callback(self,msg):
"""
receives new map info and updates the map
"""
self.map_probs = msg.data
# if we've received the map metadata and have a way to update it:
if self.map_width>0 and self.map_height>0 and len(self.map_probs)>0:
self.occupancy = StochOccupancyGrid2D(self.map_resolution,
self.map_width,
self.map_height,
self.map_origin[0],
self.map_origin[1],
8,
self.map_probs)
self.new_map = True
def near_goal(self):
"""
returns whether the robot is close enough in position to the goal to
start using the pose controller
"""
return linalg.norm(np.array([self.x-self.x_g, self.y-self.y_g])) < self.near_thresh
def at_goal(self):
"""
returns whether the robot has reached the goal position with enough
accuracy to return to idle state
"""
return (linalg.norm(np.array([self.x-self.x_g, self.y-self.y_g])) < self.near_thresh and abs(wrapToPi(self.theta - self.theta_g)) < self.at_thresh_theta)
def aligned(self):
"""
returns whether robot is aligned with starting direction of path
(enough to switch to tracking controller)
"""
return (abs(wrapToPi(self.theta - self.th_init)) < self.theta_start_thresh)
def close_to_plan_start(self):
return (abs(self.x - self.plan_start[0]) < self.start_pos_thresh and abs(self.y - self.plan_start[1]) < self.start_pos_thresh)
def snap_to_grid(self, x):
return (self.plan_resolution*round(x[0]/self.plan_resolution), self.plan_resolution*round(x[1]/self.plan_resolution))
def publish_planned_path(self, path, publisher):
# publish planned plan for visualization
path_msg = Path()
path_msg.header.frame_id = 'map'
for state in path:
pose_st = PoseStamped()
pose_st.pose.position.x = state[0]
pose_st.pose.position.y = state[1]
pose_st.pose.orientation.w = 1
pose_st.header.frame_id = 'map'
path_msg.poses.append(pose_st)
publisher.publish(path_msg)
def publish_smoothed_path(self, traj, publisher):
# publish planned plan for visualization
path_msg = Path()
path_msg.header.frame_id = 'map'
for i in range(traj.shape[0]):
pose_st = PoseStamped()
pose_st.pose.position.x = traj[i,0]
pose_st.pose.position.y = traj[i,1]
pose_st.pose.orientation.w = 1
pose_st.header.frame_id = 'map'
path_msg.poses.append(pose_st)
publisher.publish(path_msg)
def publish_control(self):
"""
Runs appropriate controller depending on the mode. Assumes all controllers
are all properly set up / with the correct goals loaded
"""
t = self.get_current_plan_time()
V, om = self.traj_controller.compute_control(self.x, self.y, self.theta, t)
cmd_vel = Twist()
cmd_vel.linear.x = V
cmd_vel.angular.z = om
self.nav_vel_pub.publish(cmd_vel)
def get_current_plan_time(self):
t = (rospy.get_rostime()-self.current_plan_start_time).to_sec()
return max(0.0, t) # clip negative time to 0
def check_inputs(userdata):
try:
(translation,rotation) = userdata.nav_out.trans_listener.lookupTransform('/map', '/base_footprint', rospy.Time(0))
userdata.nav_out.x = translation[0]
userdata.nav_out.y = translation[1]
euler = tf.transformations.euler_from_quaternion(rotation)
userdata.nav_out.theta = euler[2]
except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException) as e:
userdata.nav_out.current_plan = []
rospy.loginfo("Navigator: waiting for state info")
return 'error'
if userdata.nav_in.new_cmd_nav:
userdata.nav_out.new_cmd_nav = False
return 'new_cmd_nav'
elif userdata.nav_in.new_map:
userdata.nav_out.new_map = False
return 'new_map'
return None
class Waiting(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['nop', 'new_cmd_nav', 'new_map', 'error'],
input_keys=['nav_in', 'nav_out'],
output_keys=['nav_out'])
def execute(self, userdata):
rospy.loginfo('Executing state WAITING')
ci_outcome = check_inputs(userdata)
if ci_outcome:
return ci_outcome
# run the state
cmd_vel = Twist()
cmd_vel.linear.x = 0.0
cmd_vel.angular.z = 0.0
userdata.nav_out.nav_vel_pub.publish(cmd_vel)
rospy.sleep(.5)
return 'nop'
class Planning(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['success', 'failure', 'new_cmd_nav', 'new_map', 'error'],
input_keys=['nav_in', 'nav_out'],
output_keys=['nav_out'])
def execute(self, userdata):
rospy.loginfo('Executing state PLANNING')
ci_outcome = check_inputs(userdata)
if ci_outcome:
return ci_outcome
if not userdata.nav_in.occupancy:
rospy.loginfo("Navigator: replanning canceled, waiting for occupancy map.")
return 'failure'
# Attempt to plan a path
state_min = userdata.nav_out.snap_to_grid((-userdata.nav_in.plan_horizon, -userdata.nav_in.plan_horizon))
state_max = userdata.nav_out.snap_to_grid((userdata.nav_in.plan_horizon, userdata.nav_in.plan_horizon))
x_init = userdata.nav_out.snap_to_grid((userdata.nav_in.x, userdata.nav_in.y))
userdata.nav_out.plan_start = x_init
x_goal = userdata.nav_out.snap_to_grid((userdata.nav_in.x_g, userdata.nav_in.y_g))
problem = AStar(state_min,state_max,x_init,x_goal,userdata.nav_out.occupancy,userdata.nav_out.plan_resolution)
rospy.loginfo("Navigator: computing navigation plan")
success = problem.solve()
if not success:
rospy.loginfo("Planning failed")
return 'failure'
rospy.loginfo("Planning Succeeded")
planned_path = problem.path
# Check whether path is too short
if len(planned_path) < 4:
rospy.loginfo("Path too short to track")
return 'failure'
# Smooth and generate a trajectory
traj_new, t_new = compute_smoothed_traj(planned_path, userdata.nav_in.v_des, userdata.nav_in.spline_alpha, userdata.nav_in.traj_dt)
# Otherwise follow the new plan
userdata.nav_out.publish_planned_path(planned_path, userdata.nav_out.nav_planned_path_pub)
userdata.nav_out.publish_smoothed_path(traj_new, userdata.nav_out.nav_smoothed_path_pub)
userdata.nav_out.pose_controller.load_goal(userdata.nav_in.x_g, userdata.nav_in.y_g, userdata.nav_in.theta_g)
userdata.nav_out.traj_controller.load_traj(t_new, traj_new)
userdata.nav_out.current_plan_start_time = rospy.get_rostime()
userdata.nav_out.current_plan_duration = t_new[-1]
userdata.nav_out.th_init = traj_new[0,2]
userdata.nav_out.heading_controller.load_goal(traj_new[0,2])
return 'success'
class Tracking(smach.State):
def __init__(self):
smach.State.__init__(self,
outcomes=['nop', 'done', 'plan_expired', 'new_cmd_nav', 'new_map', 'error'],
input_keys=['nav_in', 'nav_out'],
output_keys=['nav_out'])
def execute(self, userdata):
rospy.loginfo('Executing state TRACKING')
ci_outcome = check_inputs(userdata)
if ci_outcome:
return ci_outcome
userdata.nav_out.publish_control()
if userdata.nav_out.near_goal():
userdata.nav_out.x_g = None
userdata.nav_out.y_g = None
userdata.nav_out.theta_g = None
return 'done'
elif not userdata.nav_out.close_to_plan_start():
rospy.loginfo("replanning because far from start")
return 'plan_expired'
elif (rospy.get_rostime() - userdata.nav_out.current_plan_start_time).to_sec() > userdata.nav_out.current_plan_duration:
rospy.loginfo("replanning because out of time")
return 'plan_expired'
return 'nop'
def main():
rospy.init_node('turtlebot_navigator_smach')
# Create a SMACH state machine
sm = smach.StateMachine(outcomes=[])
nav = Navigator()
sm.userdata.nav = nav
remap = remapping={'nav_in': 'nav',
'nav_out': 'nav'}
# Open the container
with sm:
# Add states to the container
smach.StateMachine.add('WAITING', Waiting(),
transitions={'nop':'WAITING', 'new_cmd_nav':'PLANNING', 'new_map':'WAITING', 'error': 'WAITING'},
remapping=remap)
smach.StateMachine.add('PLANNING', Planning(),
transitions={'success':'TRACKING', 'failure':'WAITING', 'new_cmd_nav':'PLANNING', 'new_map':'PLANNING', 'error': 'WAITING'},
remapping=remap)
smach.StateMachine.add('TRACKING', Tracking(),
transitions={'nop':'TRACKING', 'done': 'WAITING', 'plan_expired':'PLANNING', 'new_cmd_nav':'PLANNING', 'new_map':'PLANNING', 'error': 'WAITING'},
remapping=remap)
sis = smach_ros.IntrospectionServer('turtlebot_navigator_introspection', sm, '/SM_NAV')
sis.start()
# Execute SMACH plan
outcome = sm.execute()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment