Skip to content

Instantly share code, notes, and snippets.

@astraw
Created July 2, 2011 04:46
Show Gist options
  • Save astraw/1059742 to your computer and use it in GitHub Desktop.
Save astraw/1059742 to your computer and use it in GitHub Desktop.
subname enhanced ROS dynamic reconfigure server
# Software License Agreement (BSD License)
#
# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
# * Neither the name of Willow Garage, Inc. nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
"""
Python client API for dynamic_reconfigure (L{DynamicReconfigureClient}) as well as
example server implementation (L{DynamicReconfigureServer}).
"""
# This is an enhanced version of
# driver_common/dynamic_reconfigure/src/dynamic_reconfigure/server.py
# to support the "subname" parameter.
from __future__ import with_statement
import roslib; roslib.load_manifest('dynamic_reconfigure')
import rospy
import rosservice
import threading
import time
from dynamic_reconfigure import DynamicReconfigureCallbackException
from dynamic_reconfigure.srv import Reconfigure as ReconfigureSrv
from dynamic_reconfigure.msg import Config as ConfigMsg
from dynamic_reconfigure.msg import ConfigDescription as ConfigDescrMsg
from dynamic_reconfigure.msg import IntParameter, BoolParameter, StrParameter, DoubleParameter, ParamDescription
from dynamic_reconfigure.encoding import *
class Server(object):
def __init__(self, type, callback, subname = None):
self.mutex = threading.Lock()
self.type = type
self.config = type.defaults.copy() # (see https://code.ros.org/trac/ros/ticket/3394 )
self.subname = subname
if subname is None:
self._prefix = '~'
else:
self._prefix = subname + '/'
self.description = encode_description(type)
self._copy_from_parameter_server()
self.callback = callback
self._clamp(self.config)
self.descr_topic = rospy.Publisher(self._prefix+'parameter_descriptions', ConfigDescrMsg, latch=True)
self.descr_topic.publish(self.description);
self.update_topic = rospy.Publisher(self._prefix+'parameter_updates', ConfigMsg, latch=True)
self._change_config(self.config, type.all_level)
self.set_service = rospy.Service(self._prefix+'set_parameters', ReconfigureSrv, self._set_callback)
def update_configuration(self, changes):
with self.mutex:
new_config = dict(self.config)
new_config.update(changes)
self._clamp(new_config)
return self._change_config(new_config, self._calc_level(new_config, self.config))
def _copy_from_parameter_server(self):
for param in self.type.config_description:
try:
self.config[param['name']] = rospy.get_param(self._prefix + param['name'])
except KeyError:
pass
def _copy_to_parameter_server(self):
for param in self.type.config_description:
rospy.set_param(self._prefix + param['name'], self.config[param['name']])
def _change_config(self, config, level):
if self.subname != '':
# call the callback with the subname
self.config = self.callback(config, level, subname=self.subname)
else:
# backwards-compatible call to callback
self.config = self.callback(config, level)
if self.config is None:
msg = 'Reconfigure callback should return a possibly updated configuration.'
rospy.logerr(msg)
raise DynamicReconfigureCallbackException(msg)
self._copy_to_parameter_server()
self.update_topic.publish(encode_config(self.config))
return self.config
def _calc_level(self, config1, config2):
level = 0
for param in self.type.config_description:
if config1[param['name']] != config2[param['name']]:
level |= param['level']
return level
def _clamp(self, config):
for param in self.type.config_description:
maxval = self.type.max[param['name']]
minval = self.type.min[param['name']]
val = config[param['name']]
if val > maxval and maxval != "":
config[param['name']] = maxval
elif val < minval and minval != "":
config[param['name']] = minval
def _set_callback(self, req):
return encode_config(self.update_configuration(decode_config(req.config)))
--- dynamic_reconfigure/src/dynamic_reconfigure/server.py 2012-08-17 09:30:37.331027429 +0200
+++ /tmp/dynamic_reconfigure_server2.py 2012-08-17 09:26:57.360940524 +0200
@@ -35,6 +35,10 @@
example server implementation (L{DynamicReconfigureServer}).
"""
+# This is an enhanced version of
+# driver_common/dynamic_reconfigure/src/dynamic_reconfigure/server.py
+# to support the "subname" parameter.
+
from __future__ import with_statement
import roslib; roslib.load_manifest('dynamic_reconfigure')
@@ -50,22 +54,29 @@
from dynamic_reconfigure.encoding import *
class Server(object):
- def __init__(self, type, callback):
+ def __init__(self, type, callback, subname = None):
self.mutex = threading.Lock()
self.type = type
- self.config = type.defaults
+ self.config = type.defaults.copy() # (see https://code.ros.org/trac/ros/ticket/3394 )
+
+ self.subname = subname
+ if subname is None:
+ self._prefix = '~'
+ else:
+ self._prefix = subname + '/'
+
self.description = encode_description(type)
self._copy_from_parameter_server()
self.callback = callback
self._clamp(self.config)
- self.descr_topic = rospy.Publisher('~parameter_descriptions', ConfigDescrMsg, latch=True)
+ self.descr_topic = rospy.Publisher(self._prefix+'parameter_descriptions', ConfigDescrMsg, latch=True)
self.descr_topic.publish(self.description);
- self.update_topic = rospy.Publisher('~parameter_updates', ConfigMsg, latch=True)
+ self.update_topic = rospy.Publisher(self._prefix+'parameter_updates', ConfigMsg, latch=True)
self._change_config(self.config, type.all_level)
- self.set_service = rospy.Service('~set_parameters', ReconfigureSrv, self._set_callback)
+ self.set_service = rospy.Service(self._prefix+'set_parameters', ReconfigureSrv, self._set_callback)
def update_configuration(self, changes):
with self.mutex:
@@ -77,16 +88,21 @@
def _copy_from_parameter_server(self):
for param in self.type.config_description:
try:
- self.config[param['name']] = rospy.get_param("~" + param['name'])
+ self.config[param['name']] = rospy.get_param(self._prefix + param['name'])
except KeyError:
pass
def _copy_to_parameter_server(self):
for param in self.type.config_description:
- rospy.set_param('~' + param['name'], self.config[param['name']])
+ rospy.set_param(self._prefix + param['name'], self.config[param['name']])
def _change_config(self, config, level):
- self.config = self.callback(config, level)
+ if self.subname != '':
+ # call the callback with the subname
+ self.config = self.callback(config, level, subname=self.subname)
+ else:
+ # backwards-compatible call to callback
+ self.config = self.callback(config, level)
if self.config is None:
msg = 'Reconfigure callback should return a possibly updated configuration.'
rospy.logerr(msg)
@astraw
Copy link
Author

astraw commented Aug 17, 2012

For comparison with a recent question on answers.ros.org, I just updated this to include my code as a patch file against the original.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment