Skip to content

Instantly share code, notes, and snippets.

@alesolano
Created May 8, 2019 14:15
Show Gist options
  • Save alesolano/7a7af142cdbaf547b2b5d76f0c536f63 to your computer and use it in GitHub Desktop.
Save alesolano/7a7af142cdbaf547b2b5d76f0c536f63 to your computer and use it in GitHub Desktop.
from action_msgs.msg import GoalStatus
from hrim_actuator_rotaryservo_actions.action import GoalJointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
class MinimalActionClient(Node):
def __init__(self):
super().__init__('mara_minimal_action_client')
self._action_client = ActionClient(self, GoalJointTrajectory, '/hrim_actuation_servomotor_000000000001/trajectory_axis1')
def send_goal(self):
self.get_logger().info('Waiting for action server...')
self._action_client.wait_for_server()
goal_msg = GoalJointTrajectory.Goal()
# ?
goal_msg.trajectory.joint_names += ["motor1"]
# Define three trajectory points
point1 = point2 = point3 = JointTrajectoryPoint();
# Define point1 in pos=0 at time=0.
point1.positions += [0.]
point1.velocities += [0.]
point1.time_from_start.sec = 0
# Define point1 in pos=0 at time=5.
point2.positions += [0.]
point2.time_from_start.sec = 5
# Define point3 in pos=0 at time=10.
point3.positions += [0.]
point3.velocities += [0.]
point3.time_from_start.sec = 10
# Add points to trajectory
goal_msg.trajectory.points = [point1, point2, point3]
self.get_logger().info('Sending goal request...')
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
self.get_logger().info("Goal stamp: {}".format(goal_handle.stamp.sec))
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
# TODO: It gets stuck here.
def feedback_callback(self, feedback):
self.get_logger().info('Received feedback: {0}'.format(feedback.feedback.sequence))
time_from_start = feedback.actual.time_from_start.sec + feedback.actual.time_from_start.nanosec/1e+9
time_from_start_desired = feedback.desired.time_from_start.sec + feedback.desired.time_from_start.nanosec/1e+9
time_from_start_error = feedback.error.time_from_start.sec + feedback.error.time_from_start.nanosec/1e+9
self.get_logger().info("Current degrees: {}\ttime: {}".format(
feedback.actual.positions[0]*180/3.1416,
time_from_start))
self.get_logger().info("Desired degrees: {}\ttime: {}".format(
feedback.desired.positions[0]*180/3.1416,
time_from_start_desired))
self.get_logger().info("Error degrees: {}\ttime: {}".format(
feedback.error.positions[0]*180/3.1416,
time_from_start_error))
self.get_logger().info("------------------------")
def get_result_callback(self, future):
result = future.result().result
status = future.result().status
if status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info('Goal succeeded! Result: {0}'.format(result.sequence))
else:
self.get_logger().info('Goal failed with status: {0}'.format(status))
# Shutdown after receiving a result
rclpy.shutdown()
def main(args=None):
rclpy.init(args=args)
action_client = MinimalActionClient()
action_client.send_goal()
rclpy.spin(action_client)
action_client.destroy_node()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment