Skip to content

Instantly share code, notes, and snippets.

@Seiwert
Created June 8, 2023 23:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Seiwert/f60de82e497304521924482f77d1b6f7 to your computer and use it in GitHub Desktop.
Save Seiwert/f60de82e497304521924482f77d1b6f7 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
A script to outline the fundamentals of the moveit_py motion planning API.
"""
import time
# generic ros libraries
import rclpy
from rclpy.logging import get_logger
# moveit python library
from moveit.core.robot_state import RobotState
from moveit.planning import (
MoveItPy,
MultiPipelinePlanRequestParameters,
)
import time
# generic ros libraries
import rclpy
from rclpy.node import Node
from rclpy.logging import get_logger
from std_msgs.msg import ColorRGBA
from visualization_msgs.msg import Marker
from builtin_interfaces.msg import Duration
from geometry_msgs.msg import Pose
# moveit python library
from moveit.core.robot_state import RobotState
from moveit.planning import (
MoveItPy,
MultiPipelinePlanRequestParameters,
)
import openai
import os
import openai
import re
import numpy as np
openai.organization = "[your org here]"
openai.api_key = "[your api key here]"
class MarkPublisher(Node):
def __init__(self):
super().__init__('mark_publisher')
self.publisher_ = self.create_publisher(Marker, 'marker_publisher_topic', 10)
def publisher_marker(self, spot, id):
print("==============================")
print("HEY WE'RE PUBLISHGIN A MARKER")
color = ColorRGBA()
color.r = 1.0
color.g = 2.0
color.b = 0.1
color.a = 1.0
time = Duration()
time.sec = 100000
time.nanosec = 0
point = Pose()
point.position.x = spot[0]
point.position.y = spot[1]
point.position.z = 0.5
sphere_marker = Marker()
sphere_marker.header.frame_id = "world"
sphere_marker.ns = ""
sphere_marker.id = id
sphere_marker.type = Marker.SPHERE
sphere_marker.action = Marker.ADD
sphere_marker.lifetime = time #Infinite lifetime
sphere_marker.scale.x = 0.05
sphere_marker.scale.y = 0.05
sphere_marker.scale.z = 0.05
sphere_marker.color = color
sphere_marker.frame_locked = False
sphere_marker.pose = point
self.publisher_.publish(sphere_marker)
def normalize(a, _max=0.3):
a = np.array([[x[0], x[1]] for x in a])
a = a - a.min()
a = a / a.max()
a = a * _max
a = a + 0.2
for i in range(100):
print(a)
return [(x[0], x[1]) for x in a]
def get_draw_points(drawing="sad face", openai=openai, norm=True):
try:
output = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You draw art using (x, y) points. You only output a python list of points [(x, y), ...]."},
{"role": "user", "content": f"Create 10 (x,y) points to draw a {drawing}. Output python list [(x, y), (x, y), ...]"},
{"role": "assistant", "content":"Here's a set of points to draw"},
{"role": "user", "content":"Only output a python list [(x, y), (x, y), ...]"}
]
)
output = output['choices'][0]['message']['content']
print(f"OpenAi Output {output}")
# re find all things that match a python list of tuples
re_s = r"(\([-|\s]*\d{1,3}\,[-|\s]*\d{1,3}\))"
matches = re.findall(re_s, output)
print(len(matches))
xy_list = ",".join(matches)
xy_list = eval("[" + xy_list + "]")
if len(xy_list) == 0:
print("Faild retrying")
return get_draw_points(drawing=drawing, openai=openai)
if norm:
xy_list = normalize(xy_list)
return xy_list
except Exception as ex:
print(ex)
print("Retrying")
return get_draw_points(drawing=drawing, openai=openai)
def plan_and_execute(
robot,
planning_component,
logger,
single_plan_parameters=None,
multi_plan_parameters=None,
sleep_time=0.0,
):
"""Helper function to plan and execute a motion."""
# plan to goal
logger.info("Planning trajectory")
if multi_plan_parameters is not None:
plan_result = planning_component.plan(
multi_plan_parameters=multi_plan_parameters
)
elif single_plan_parameters is not None:
plan_result = planning_component.plan(
single_plan_parameters=single_plan_parameters
)
else:
plan_result = planning_component.plan()
# execute the plan
if plan_result:
logger.info("Executing plan")
robot_trajectory = plan_result.trajectory
robot.execute(robot_trajectory, controllers=[])
else:
logger.error("Planning failed")
time.sleep(sleep_time)
def main(args=None):
###################################################################
# MoveItPy Setup
###################################################################
rclpy.init(args=args)
logger = get_logger("moveit_py.pose_goal")
minimal_publisher = MarkPublisher()
# instantiate MoveItPy instance and get planning component
panda = MoveItPy(node_name="moveit_py")
panda_arm = panda.get_planning_component("panda_arm")
logger.info("MoveItPy instance created")
###########################################################################
# Plan 1 - set states with predefined string
###########################################################################
# set plan start state using predefined state
panda_arm.set_start_state(configuration_name="ready")
# set pose goal using predefined state
panda_arm.set_goal_state(configuration_name="extended")
# plan to goal
plan_and_execute(panda, panda_arm, logger, sleep_time=3.0)
###########################################################################
# Plan 3 - set goal state with PoseStamped message
###########################################################################
points = get_draw_points()
#points = [(0.28, -0.2)] * 1000
for i, point in enumerate(points):
x, y = point
#print(f"Runing point {point}")
# set plan start state to current state
panda_arm.set_start_state_to_current_state()
# set pose goal with PoseStamped message
from geometry_msgs.msg import PoseStamped
pose_goal = PoseStamped()
pose_goal.header.frame_id = "panda_link0"
pose_goal.pose.orientation.w = 1.0
pose_goal.pose.position.x = x
pose_goal.pose.position.y = y
pose_goal.pose.position.z = 0.5
panda_arm.set_goal_state(pose_stamped_msg=pose_goal, pose_link="panda_link8")
# plan to goal
plan_and_execute(panda, panda_arm, logger, sleep_time=3.0)
minimal_publisher.publisher_marker(point, i)
panda_arm.set_start_state_to_current_state()
panda_arm.set_goal_state(configuration_name="extended")
plan_and_execute(panda, panda_arm, logger, sleep_time=3.0)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment