Skip to content

Instantly share code, notes, and snippets.

@felixvd
Created October 1, 2020 01:32
Show Gist options
  • Save felixvd/8e11cff9ec533f34b89437bcfd44c64a to your computer and use it in GitHub Desktop.
Save felixvd/8e11cff9ec533f34b89437bcfd44c64a to your computer and use it in GitHub Desktop.
Boilerplate code to activate ROS control script on UR robots ( https://github.com/UniversalRobots/Universal_Robots_ROS_Driver )
# Software License Agreement (BSD License)
#
# Copyright (c) 2020, OMRON SINIC X
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of OMRON SINIC X nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Felix von Drigalski
// [...]
// class member variables
a_bot_get_loaded_program_ = n_.serviceClient<ur_dashboard_msgs::GetLoadedProgram>("/a_bot/ur_hardware_interface/dashboard/get_loaded_program");
a_bot_program_running_ = n_.serviceClient<ur_dashboard_msgs::IsProgramRunning>("/a_bot/ur_hardware_interface/dashboard/program_running");
a_bot_load_program_ = n_.serviceClient<ur_dashboard_msgs::Load>("/a_bot/ur_hardware_interface/dashboard/load_program");
a_bot_play_ = n_.serviceClient<std_srvs::Trigger>("/a_bot/ur_hardware_interface/dashboard/play");
b_bot_get_loaded_program_ = n_.serviceClient<ur_dashboard_msgs::GetLoadedProgram>("/b_bot/ur_hardware_interface/dashboard/get_loaded_program");
b_bot_program_running_ = n_.serviceClient<ur_dashboard_msgs::IsProgramRunning>("/b_bot/ur_hardware_interface/dashboard/program_running");
b_bot_load_program_ = n_.serviceClient<ur_dashboard_msgs::Load>("/b_bot/ur_hardware_interface/dashboard/load_program");
b_bot_play_ = n_.serviceClient<std_srvs::Trigger>("/b_bot/ur_hardware_interface/dashboard/play");
// [...]
bool MyClass::activateROSControlOnUR(std::string robot_name, int recursion_depth)
{
ROS_DEBUG_STREAM("Checking if ros control active on " << robot_name);
if (!use_real_robot_)
return true;
if (robot_name == "a_bot" && a_bot_ros_control_active_)
return true;
else if (robot_name == "b_bot" && b_bot_ros_control_active_)
return true;
else if (robot_name != "a_bot" && robot_name != "b_bot")
{
ROS_ERROR("Robot name was not found or the robot is not a UR!");
return false;
}
// If ROS External Control program is not running on UR, load and start it
ROS_INFO_STREAM("Activating ROS control for robot: " << robot_name);
ros::ServiceClient get_loaded_program, load_program, play;
if (robot_name == "a_bot")
{
get_loaded_program = a_bot_get_loaded_program_;
load_program = a_bot_load_program_;
play = a_bot_play_;
}
else // b_bot
{
get_loaded_program = b_bot_get_loaded_program_;
load_program = b_bot_load_program_;
play = b_bot_play_;
}
// Check if correct program loaded
ur_dashboard_msgs::GetLoadedProgram srv2;
get_loaded_program.call(srv2);
if (srv2.response.program_name != "/programs/ROS_external_control.urp")
{
// Load program
ur_dashboard_msgs::Load srv3;
srv3.request.filename = "ROS_external_control.urp";
load_program.call(srv3);
if (!srv3.response.success)
{
if (recursion_depth < 3) // Reconnect and try again
{
ROS_WARN_STREAM("Could not load ROS_external_control.urp.");
ROS_WARN_STREAM("Trying to reconnect dashboard client and then activating again.");
if (recursion_depth > 0) // If connect alone failed once, try quitting then connecting
{
ros::ServiceClient quit_client = n_.serviceClient<std_srvs::Trigger>("/" + robot_name + "/ur_hardware_interface/dashboard/quit");
std_srvs::Trigger quit;
quit_client.call(quit);
ros::Duration(0.5).sleep();
}
ros::ServiceClient connect_client = n_.serviceClient<std_srvs::Trigger>("/" + robot_name + "/ur_hardware_interface/dashboard/connect");
std_srvs::Trigger conn;
connect_client.call(conn);
ros::Duration(0.5).sleep();
return activateROSControlOnUR(robot_name, recursion_depth+1);
}
else
{
ROS_ERROR_STREAM("Was not able to load ROS_external_control.urp. Is Remote Mode set on the pendant, the UR robot set up correctly and the program installed with the correct name?");
ROS_ERROR_STREAM("service answer: " << srv3.response.answer);
return false;
}
}
}
// Run the program
std_srvs::Trigger srv4;
play.call(srv4);
ros::Duration(2.0).sleep();
if (!srv4.response.success)
{
if (recursion_depth < 3) // Reconnect and try again (this second block is probably redundant)
{
ROS_WARN_STREAM("Could not start ROS_external_control.urp.");
ROS_WARN_STREAM("Trying to reconnect dashboard client and then activating again.");
if (recursion_depth > 0) // If connect alone failed once, try quitting then connecting
{
ros::ServiceClient quit_client = n_.serviceClient<std_srvs::Trigger>("/" + robot_name + "/ur_hardware_interface/dashboard/quit");
std_srvs::Trigger quit;
quit_client.call(quit);
ros::Duration(0.5).sleep();
}
ros::ServiceClient connect_client = n_.serviceClient<std_srvs::Trigger>("/" + robot_name + "/ur_hardware_interface/dashboard/connect");
std_srvs::Trigger conn;
connect_client.call(conn);
ros::Duration(0.5).sleep();
return activateROSControlOnUR(robot_name, recursion_depth+1);
}
else
{
ROS_ERROR_STREAM("Was not able to start ROS_external_control.urp. Is Remote Mode set on the pendant, the UR robot set up correctly and the program installed with the correct name?");
return false;
}
}
ROS_INFO_STREAM("Successfully activated ROS control on robot: " << robot_name);
return true;
}
#!/usr/bin/env python
# Software License Agreement (BSD License)
#
# Copyright (c) 2020, OMRON SINIC X
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of OMRON SINIC X nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Author: Felix von Drigalski
import sys
import copy
import rospy
import ur_dashboard_msgs.msg
import ur_dashboard_msgs.srv
import std_srvs.srv
from std_msgs.msg import String
import ur_msgs.msg
if __name__ == '__main__':
rospy.init_node('helper', anonymous=True)
ur_dashboard_clients = {
"a_bot_get_loaded_program":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/get_loaded_program', ur_dashboard_msgs.srv.GetLoadedProgram),
"b_bot_get_loaded_program":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/get_loaded_program', ur_dashboard_msgs.srv.GetLoadedProgram),
"a_bot_program_running":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/program_running', ur_dashboard_msgs.srv.IsProgramRunning),
"b_bot_program_running":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/program_running', ur_dashboard_msgs.srv.IsProgramRunning),
"a_bot_load_program":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/load_program', ur_dashboard_msgs.srv.Load),
"b_bot_load_program":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/load_program', ur_dashboard_msgs.srv.Load),
"a_bot_play":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/play', std_srvs.srv.Trigger),
"b_bot_play":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/play', std_srvs.srv.Trigger),
"a_bot_connect":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/connect', std_srvs.srv.Trigger),
"b_bot_connect":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/connect', std_srvs.srv.Trigger),
"a_bot_quit":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/quit', std_srvs.srv.Trigger),
"b_bot_quit":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/quit', std_srvs.srv.Trigger)
}
def activate_ros_control_on_ur(robot, recursion_depth=0):
# Check if URCap is already running on UR
try:
response = ur_dashboard_clients[robot + "_program_running"].call(ur_dashboard_msgs.srv.IsProgramRunningRequest())
if response.program_running:
response = ur_dashboard_clients[robot + "_get_loaded_program"].call(ur_dashboard_msgs.srv.GetLoadedProgramRequest())
if response.program_name == "/programs/ROS_external_control.urp":
return True
except: # Try reconnecting and restart the function
response = ur_dashboard_clients[robot + "_quit"].call()
rospy.sleep(.5)
response = ur_dashboard_clients[robot + "_connect"].call()
return activate_ros_control_on_ur(robot, recursion_depth=recursion_depth+1)
# Load program
rospy.loginfo("Activating ROS control on robot " + robot)
request = ur_dashboard_msgs.srv.LoadRequest()
request.filename = "ROS_external_control.urp"
response = ur_dashboard_clients[robot + "_load_program"].call(request)
if not response.success:
rospy.logwarn("Could not load URCap. Trying to reconnect to dashboard server.")
response = ur_dashboard_clients[robot + "_quit"].call()
rospy.sleep(.5)
response = ur_dashboard_clients[robot + "_connect"].call()
if not response.success:
rospy.logerr("Could not start UR control. Is the UR in Remote Control mode and program installed with correct name?")
return False
rospy.loginfo("Successfully connected to " + robot + " dashboard server.")
# Try loading again
request = ur_dashboard_msgs.srv.LoadRequest()
request.filename = "ROS_external_control.urp"
response = ur_dashboard_clients[robot + "_load_program"].call(request)
# Run the program
response = ur_dashboard_clients[robot + "_play"].call(std_srvs.srv.TriggerRequest())
rospy.sleep(2)
if not response.success:
rospy.logerr("Could not start UR control. Is the UR in Remote Control mode and program installed with correct name?")
return False
else:
rospy.loginfo("Successfully activated ROS control on robot " + robot)
return True
activate_ros_control_on_ur("a_bot")
activate_ros_control_on_ur("b_bot")
# class member variables
class MyClass(object):
def __init__(self):
rospy.init_node('mynode', anonymous=False)
# Service clients
self.ur_dashboard_clients = {
"a_bot_get_loaded_program":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/get_loaded_program', ur_dashboard_msgs.srv.GetLoadedProgram),
"b_bot_get_loaded_program":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/get_loaded_program', ur_dashboard_msgs.srv.GetLoadedProgram),
"a_bot_load_program":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/load_program', ur_dashboard_msgs.srv.Load),
"b_bot_load_program":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/load_program', ur_dashboard_msgs.srv.Load),
"a_bot_play":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/play', std_srvs.srv.Trigger),
"b_bot_play":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/play', std_srvs.srv.Trigger),
"a_bot_quit":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/quit', std_srvs.srv.Trigger),
"b_bot_quit":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/quit', std_srvs.srv.Trigger),
"a_bot_connect":rospy.ServiceProxy('/a_bot/ur_hardware_interface/dashboard/connect', std_srvs.srv.Trigger),
"b_bot_connect":rospy.ServiceProxy('/b_bot/ur_hardware_interface/dashboard/connect', std_srvs.srv.Trigger)
}
self.sub_a_bot_status_ = rospy.Subscriber("/a_bot/ur_hardware_interface/robot_program_running", Bool, self.a_bot_ros_control_status_callback)
self.sub_b_bot_status_ = rospy.Subscriber("/b_bot/ur_hardware_interface/robot_program_running", Bool, self.b_bot_ros_control_status_callback)
def a_bot_ros_control_status_callback(self, msg):
self.ur_ros_control_running_on_robot["a_bot"] = msg.data
def b_bot_ros_control_status_callback(self, msg):
self.ur_ros_control_running_on_robot["b_bot"] = msg.data
def activate_ros_control_on_ur(self, robot="a_bot"):
# Check if URCap is already running on UR
try:
if self.ur_ros_control_running_on_robot[robot]:
return True
except:
rospy.logerr("Robot name '" + robot + "' was not found or the robot is not a UR!")
return False
# Load program if it not loaded already
rospy.loginfo("Activating ROS control on robot " + robot)
response = self.ur_dashboard_clients[robot + "_get_loaded_program"].call(ur_dashboard_msgs.srv.GetLoadedProgramRequest())
if response.program_name != "/programs/ROS_external_control.urp":
request = ur_dashboard_msgs.srv.LoadRequest()
request.filename = "ROS_external_control.urp"
response = self.ur_dashboard_clients[robot + "_load_program"].call(request)
if not response.success: # Try reconnecting to dashboard
if recursion_depth > 2: # Break out after 3 tries
rospy.logerr("Could not load the ROS_external_control.urp URCap. Is the UR in Remote Control mode and program installed with correct name?")
return False
if recursion_depth > 0: # If connect alone failed, try quit and then connect
response = ur_dashboard_clients[robot + "_quit"].call()
rospy.sleep(.5)
response = ur_dashboard_clients[robot + "_connect"].call()
rospy.sleep(.5)
return activate_ros_control_on_ur(robot, recursion_depth=recursion_depth+1)
# Run the program
response = self.ur_dashboard_clients[robot + "_play"].call(std_srvs.srv.TriggerRequest())
rospy.sleep(2)
if not response.success:
rospy.logerr("Could not start UR control. Is the UR in Remote Control mode and program installed with correct name?")
return False
else:
rospy.loginfo("Successfully activated ROS control on robot " + robot)
return True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment