Created
June 8, 2023 23:19
-
-
Save Seiwert/f60de82e497304521924482f77d1b6f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
A script to outline the fundamentals of the moveit_py motion planning API. | |
""" | |
import time | |
# generic ros libraries | |
import rclpy | |
from rclpy.logging import get_logger | |
# moveit python library | |
from moveit.core.robot_state import RobotState | |
from moveit.planning import ( | |
MoveItPy, | |
MultiPipelinePlanRequestParameters, | |
) | |
import time | |
# generic ros libraries | |
import rclpy | |
from rclpy.node import Node | |
from rclpy.logging import get_logger | |
from std_msgs.msg import ColorRGBA | |
from visualization_msgs.msg import Marker | |
from builtin_interfaces.msg import Duration | |
from geometry_msgs.msg import Pose | |
# moveit python library | |
from moveit.core.robot_state import RobotState | |
from moveit.planning import ( | |
MoveItPy, | |
MultiPipelinePlanRequestParameters, | |
) | |
import openai | |
import os | |
import openai | |
import re | |
import numpy as np | |
openai.organization = "[your org here]" | |
openai.api_key = "[your api key here]" | |
class MarkPublisher(Node): | |
def __init__(self): | |
super().__init__('mark_publisher') | |
self.publisher_ = self.create_publisher(Marker, 'marker_publisher_topic', 10) | |
def publisher_marker(self, spot, id): | |
print("==============================") | |
print("HEY WE'RE PUBLISHGIN A MARKER") | |
color = ColorRGBA() | |
color.r = 1.0 | |
color.g = 2.0 | |
color.b = 0.1 | |
color.a = 1.0 | |
time = Duration() | |
time.sec = 100000 | |
time.nanosec = 0 | |
point = Pose() | |
point.position.x = spot[0] | |
point.position.y = spot[1] | |
point.position.z = 0.5 | |
sphere_marker = Marker() | |
sphere_marker.header.frame_id = "world" | |
sphere_marker.ns = "" | |
sphere_marker.id = id | |
sphere_marker.type = Marker.SPHERE | |
sphere_marker.action = Marker.ADD | |
sphere_marker.lifetime = time #Infinite lifetime | |
sphere_marker.scale.x = 0.05 | |
sphere_marker.scale.y = 0.05 | |
sphere_marker.scale.z = 0.05 | |
sphere_marker.color = color | |
sphere_marker.frame_locked = False | |
sphere_marker.pose = point | |
self.publisher_.publish(sphere_marker) | |
def normalize(a, _max=0.3): | |
a = np.array([[x[0], x[1]] for x in a]) | |
a = a - a.min() | |
a = a / a.max() | |
a = a * _max | |
a = a + 0.2 | |
for i in range(100): | |
print(a) | |
return [(x[0], x[1]) for x in a] | |
def get_draw_points(drawing="sad face", openai=openai, norm=True): | |
try: | |
output = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{"role": "system", "content": "You draw art using (x, y) points. You only output a python list of points [(x, y), ...]."}, | |
{"role": "user", "content": f"Create 10 (x,y) points to draw a {drawing}. Output python list [(x, y), (x, y), ...]"}, | |
{"role": "assistant", "content":"Here's a set of points to draw"}, | |
{"role": "user", "content":"Only output a python list [(x, y), (x, y), ...]"} | |
] | |
) | |
output = output['choices'][0]['message']['content'] | |
print(f"OpenAi Output {output}") | |
# re find all things that match a python list of tuples | |
re_s = r"(\([-|\s]*\d{1,3}\,[-|\s]*\d{1,3}\))" | |
matches = re.findall(re_s, output) | |
print(len(matches)) | |
xy_list = ",".join(matches) | |
xy_list = eval("[" + xy_list + "]") | |
if len(xy_list) == 0: | |
print("Faild retrying") | |
return get_draw_points(drawing=drawing, openai=openai) | |
if norm: | |
xy_list = normalize(xy_list) | |
return xy_list | |
except Exception as ex: | |
print(ex) | |
print("Retrying") | |
return get_draw_points(drawing=drawing, openai=openai) | |
def plan_and_execute( | |
robot, | |
planning_component, | |
logger, | |
single_plan_parameters=None, | |
multi_plan_parameters=None, | |
sleep_time=0.0, | |
): | |
"""Helper function to plan and execute a motion.""" | |
# plan to goal | |
logger.info("Planning trajectory") | |
if multi_plan_parameters is not None: | |
plan_result = planning_component.plan( | |
multi_plan_parameters=multi_plan_parameters | |
) | |
elif single_plan_parameters is not None: | |
plan_result = planning_component.plan( | |
single_plan_parameters=single_plan_parameters | |
) | |
else: | |
plan_result = planning_component.plan() | |
# execute the plan | |
if plan_result: | |
logger.info("Executing plan") | |
robot_trajectory = plan_result.trajectory | |
robot.execute(robot_trajectory, controllers=[]) | |
else: | |
logger.error("Planning failed") | |
time.sleep(sleep_time) | |
def main(args=None): | |
################################################################### | |
# MoveItPy Setup | |
################################################################### | |
rclpy.init(args=args) | |
logger = get_logger("moveit_py.pose_goal") | |
minimal_publisher = MarkPublisher() | |
# instantiate MoveItPy instance and get planning component | |
panda = MoveItPy(node_name="moveit_py") | |
panda_arm = panda.get_planning_component("panda_arm") | |
logger.info("MoveItPy instance created") | |
########################################################################### | |
# Plan 1 - set states with predefined string | |
########################################################################### | |
# set plan start state using predefined state | |
panda_arm.set_start_state(configuration_name="ready") | |
# set pose goal using predefined state | |
panda_arm.set_goal_state(configuration_name="extended") | |
# plan to goal | |
plan_and_execute(panda, panda_arm, logger, sleep_time=3.0) | |
########################################################################### | |
# Plan 3 - set goal state with PoseStamped message | |
########################################################################### | |
points = get_draw_points() | |
#points = [(0.28, -0.2)] * 1000 | |
for i, point in enumerate(points): | |
x, y = point | |
#print(f"Runing point {point}") | |
# set plan start state to current state | |
panda_arm.set_start_state_to_current_state() | |
# set pose goal with PoseStamped message | |
from geometry_msgs.msg import PoseStamped | |
pose_goal = PoseStamped() | |
pose_goal.header.frame_id = "panda_link0" | |
pose_goal.pose.orientation.w = 1.0 | |
pose_goal.pose.position.x = x | |
pose_goal.pose.position.y = y | |
pose_goal.pose.position.z = 0.5 | |
panda_arm.set_goal_state(pose_stamped_msg=pose_goal, pose_link="panda_link8") | |
# plan to goal | |
plan_and_execute(panda, panda_arm, logger, sleep_time=3.0) | |
minimal_publisher.publisher_marker(point, i) | |
panda_arm.set_start_state_to_current_state() | |
panda_arm.set_goal_state(configuration_name="extended") | |
plan_and_execute(panda, panda_arm, logger, sleep_time=3.0) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment