Last active
December 4, 2020 21:52
-
-
Save Achllle/adeec80c90af32777dfb2b53f2b100f5 to your computer and use it in GitHub Desktop.
generic ROS topic to service relay node
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
convert topic with payload to a service. The topic does not have to exist yet for this to run. | |
Usage: | |
rosrun [some_pkg] relay_topic2service.py /input_topic topic_type /service_address srv_type python_expression | |
e.g. | |
rosrun [some_pkg] relay_topic2service.py /my_trigger_topic std_msgs/Empty /trigger_this std_srvs/Empty '' | |
or | |
rosrun [some_pkg] relay_topic2service.py /set_map_topic nav_msgs/OccupancyGrid /set_map nav_msgs/SetMap "map: m.map" | |
@author Achille, based on topic_tools/RelayField | |
""" | |
from __future__ import print_function | |
import argparse | |
import sys | |
import copy | |
import yaml | |
import roslib | |
import rospy | |
import rostopic | |
import genpy | |
import std_msgs | |
def _eval_in_dict_impl(dict_, globals_, locals_): | |
res = copy.deepcopy(dict_) | |
for k, v in res.items(): | |
type_ = type(v) | |
if type_ is dict: | |
res[k] = _eval_in_dict_impl(v, globals_, locals_) | |
elif (type_ is str) or (type_ is unicode): | |
try: | |
res[k] = eval(v, globals_, locals_) | |
except NameError: | |
pass | |
except SyntaxError: | |
pass | |
return res | |
class RelayTopic2Service(object): | |
def __init__(self): | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.RawTextHelpFormatter, | |
description=( | |
'Allows to relay field data from one topic to another.\n\n' | |
'Usage:\n\trosrun topic_tools relay_topic_to_service ' | |
'<input> <input type> <output service> <output service type> ' | |
'<expression on m>\n\n' | |
'Example:\n\trosrun topic_tools relay_field ' | |
'/chatter std_msgs/String /bool_set_srv std_srvs/SetBool\n\t' | |
'"data: m.data"\n\n' | |
) | |
) | |
parser.add_argument('input', help='Input topic or topic field.') | |
parser.add_argument('input_type', help='Input topic type.') | |
parser.add_argument('output_service', help='Output service address.') | |
parser.add_argument('output_type', help='Output service type.') | |
parser.add_argument( | |
'expression', | |
help='Python expression to apply on the input message \'m\'.' | |
) | |
args, rosargs = parser.parse_known_args() | |
self.expression = args.expression | |
input_class, input_topic, self.input_fn = rostopic.get_topic_class( | |
args.input) | |
if input_topic is None: | |
input_topic = args.input | |
input_class = roslib.message.get_message_class(args.input_type) | |
self.output_class = roslib.message.get_service_class(args.output_type) | |
self.sub = rospy.Subscriber(input_topic, input_class, self.callback) | |
self.service_client = rospy.ServiceProxy(args.output_service, self.output_class) | |
def callback(self, m): | |
if self.input_fn is not None: | |
m = self.input_fn(m) | |
rospy.logdebug("received message to convert to srv call: \n\t{}".format(m)) | |
msg_generation = yaml.load(self.expression) | |
try: | |
pub_args = _eval_in_dict_impl(msg_generation, None, {'m': m}) | |
except AttributeError: | |
pub_args = {} | |
now = rospy.get_rostime() | |
keys = { 'now': now, 'auto': std_msgs.msg.Header(stamp=now) } | |
msg = self.output_class._request_class() | |
genpy.message.fill_message_args(msg, [pub_args], keys=keys) | |
self.service_client(msg) | |
if __name__ == '__main__': | |
rospy.init_node('relay_topic_to_service', anonymous=True) | |
app = RelayTopic2Service() | |
rospy.spin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment