Skip to content

Instantly share code, notes, and snippets.

Last active December 4, 2020 21:52
Show Gist options
  • Save Achllle/adeec80c90af32777dfb2b53f2b100f5 to your computer and use it in GitHub Desktop.
Save Achllle/adeec80c90af32777dfb2b53f2b100f5 to your computer and use it in GitHub Desktop.
generic ROS topic to service relay node
#!/usr/bin/env python
convert topic with payload to a service. The topic does not have to exist yet for this to run.
rosrun [some_pkg] /input_topic topic_type /service_address srv_type python_expression
rosrun [some_pkg] /my_trigger_topic std_msgs/Empty /trigger_this std_srvs/Empty ''
rosrun [some_pkg] /set_map_topic nav_msgs/OccupancyGrid /set_map nav_msgs/SetMap "map:"
@author Achille, based on topic_tools/RelayField
from __future__ import print_function
import argparse
import sys
import copy
import yaml
import roslib
import rospy
import rostopic
import genpy
import std_msgs
def _eval_in_dict_impl(dict_, globals_, locals_):
res = copy.deepcopy(dict_)
for k, v in res.items():
type_ = type(v)
if type_ is dict:
res[k] = _eval_in_dict_impl(v, globals_, locals_)
elif (type_ is str) or (type_ is unicode):
res[k] = eval(v, globals_, locals_)
except NameError:
except SyntaxError:
return res
class RelayTopic2Service(object):
def __init__(self):
parser = argparse.ArgumentParser(
'Allows to relay field data from one topic to another.\n\n'
'Usage:\n\trosrun topic_tools relay_topic_to_service '
'<input> <input type> <output service> <output service type> '
'<expression on m>\n\n'
'Example:\n\trosrun topic_tools relay_field '
'/chatter std_msgs/String /bool_set_srv std_srvs/SetBool\n\t'
parser.add_argument('input', help='Input topic or topic field.')
parser.add_argument('input_type', help='Input topic type.')
parser.add_argument('output_service', help='Output service address.')
parser.add_argument('output_type', help='Output service type.')
help='Python expression to apply on the input message \'m\'.'
args, rosargs = parser.parse_known_args()
self.expression = args.expression
input_class, input_topic, self.input_fn = rostopic.get_topic_class(
if input_topic is None:
input_topic = args.input
input_class = roslib.message.get_message_class(args.input_type)
self.output_class = roslib.message.get_service_class(args.output_type)
self.sub = rospy.Subscriber(input_topic, input_class, self.callback)
self.service_client = rospy.ServiceProxy(args.output_service, self.output_class)
def callback(self, m):
if self.input_fn is not None:
m = self.input_fn(m)
rospy.logdebug("received message to convert to srv call: \n\t{}".format(m))
msg_generation = yaml.load(self.expression)
pub_args = _eval_in_dict_impl(msg_generation, None, {'m': m})
except AttributeError:
pub_args = {}
now = rospy.get_rostime()
keys = { 'now': now, 'auto': std_msgs.msg.Header(stamp=now) }
msg = self.output_class._request_class()
genpy.message.fill_message_args(msg, [pub_args], keys=keys)
if __name__ == '__main__':
rospy.init_node('relay_topic_to_service', anonymous=True)
app = RelayTopic2Service()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment