Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Created June 7, 2019 18:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awesomebytes/b552ed36aeb532a3aadebecc425b3330 to your computer and use it in GitHub Desktop.
Save awesomebytes/b552ed36aeb532a3aadebecc425b3330 to your computer and use it in GitHub Desktop.
A node with Dynamic Reconfigure to re-publish an image topic with CameraInfo and with a custom TF transform (useful to quick-test calibrations by hand)
#!/usr/bin/env python
from copy import deepcopy
import sys
import math
import rospy
from sensor_msgs.msg import Image, CameraInfo
from tf.transformations import quaternion_from_euler
from ddynamic_reconfigure_python.ddynamic_reconfigure import DDynamicReconfigure
from tf import TransformBroadcaster
def default_camera_info_from_image(img):
cam_info_msg = CameraInfo()
cam_info_msg.header.frame_id = img.header.frame_id
cam_info_msg.height = img.height
cam_info_msg.width = img.width
cam_info_msg.distortion_model = "plumb_bob"
cam_info_msg.D = [0.0] * 5
cam_info_msg.K = [1.0, 0.0, img.width / 2.0,
0.0, 1.0, img.height / 2.0,
0.0, 0.0, 1.0]
cam_info_msg.R = [1.0, 0.0, 0.0,
0.0, 1.0, 0.0,
0.0, 0.0, 1.0]
cam_info_msg.P = [1.0, 0.0, img.width / 2.0, 0.0,
0.0, 1.0, img.height / 2.0, 0.0,
0.0, 0.0, 1.0, 0.0]
return cam_info_msg
class CameraRepublisherWithTF(object):
def __init__(self, image_topic_in, image_topic_out,
parent_frame_id,
children_frame_id,
camera_info_topic_out=None,
camera_info_topic_in=None,):
self.pub_img = rospy.Publisher(image_topic_out,
Image,
queue_size=1)
self.pub_cinfo = rospy.Publisher(camera_info_topic_out,
CameraInfo,
queue_size=1)
self.ddr = DDynamicReconfigure(
image_topic_out.replace('/', '_') + '_transform')
self.ddr.add_variable('x', 'x (m)', min=-10.0, max=10.0, default=0.0)
self.ddr.add_variable('y', 'y (m)', min=-10.0, max=10.0, default=0.0)
self.ddr.add_variable('z', 'z (m)', min=-10.0, max=10.0, default=0.0)
self.ddr.add_variable('roll', 'roll (degrees)',
min=-360.0, max=360.0, default=0.0)
self.ddr.add_variable('pitch', 'pitch (degrees)',
min=-360.0, max=360.0, default=0.0)
self.ddr.add_variable('yaw', 'yaw (degrees)',
min=-360.0, max=360.0, default=0.0)
self.ddr.add_variable('parent_frame_id',
'parent_frame_id for header_frame_id',
default=parent_frame_id)
self.ddr.add_variable('children_frame_id',
'children_frame_id',
default=children_frame_id)
self.ddr.add_variable('tf_rate',
'rate to publish tf (hz)',
min=0.0, max=1000.0, default=20.0)
self.add_variables_to_self()
self.ddr.start(self.dyn_rec_callback)
self.rate = rospy.Rate(self.tf_rate)
self.transform_broadcaster = TransformBroadcaster()
self.last_img = None
self.sub_img = rospy.Subscriber(image_topic_in,
Image,
self.img_cb,
queue_size=1)
self.cinfo_msg = None
if camera_info_topic_in:
self.sub_cinfo = rospy.Subscriber(camera_info_topic_in,
CameraInfo,
self.cinfo_cb,
queue_size=1)
else:
# make up a best-guess camera info
# from the first image that comes in
rospy.loginfo(
"Waiting for an image to create a CameraInfo image...")
while not rospy.is_shutdown() and self.last_img is None:
rospy.sleep(0.5)
rospy.loginfo("Got one!")
self.cinfo_msg = default_camera_info_from_image(self.last_img)
rospy.loginfo("CameraRepublisherWithTF ready!")
def publish_tf(self):
self.transform_broadcaster.sendTransform(
[self.x, self.y, self.z],
quaternion_from_euler(
math.radians(self.roll),
math.radians(self.pitch),
math.radians(self.yaw)),
rospy.Time.now(),
self.children_frame_id,
self.parent_frame_id)
def add_variables_to_self(self):
var_names = self.ddr.get_variable_names()
for var_name in var_names:
self.__setattr__(var_name, None)
def dyn_rec_callback(self, config, level):
rospy.loginfo("Received reconf call: " + str(config))
# Update all variables
var_names = self.ddr.get_variable_names()
for var_name in var_names:
self.__dict__[var_name] = config[var_name]
self.rate = rospy.Rate(self.tf_rate)
return config
def cinfo_cb(self, cinfo_msg):
self.cinfo_msg = cinfo_msg
def img_cb(self, img):
self.last_img = img
# Republish with the new frame_id
img.header.frame_id = self.children_frame_id
self.pub_img.publish(img)
# Also publish camera info accordingly
cinfo_tmp = deepcopy(self.cinfo_msg)
cinfo_tmp.header.stamp = img.header.stamp
cinfo_tmp.header.frame_id = self.children_frame_id
self.pub_cinfo.publish(cinfo_tmp)
def run(self):
while not rospy.is_shutdown():
# Publish TF at the desired rate
self.publish_tf()
self.rate.sleep()
# The images will be re-publishes as soon as they arrive
# Also a CameraInfo message will be published on every image pub
if __name__ == '__main__':
argv = rospy.myargv(sys.argv)
if len(argv) < 6 or len(argv) > 7:
print("Usage:")
print(argv[0] + " image_topic_OUT image_topic_IN parent_frame_id children_frame_id camera_info_topic_OUT [camera_info_topic_IN]")
print("\nE.g.:")
print(argv[0] + " /camera_new_frame/image_raw /camera/image_raw base_footprint camera_new_frame /camera_new_frame/camera_info")
exit(0)
rospy.init_node('camera_republisher')
image_topic_out = argv[1]
image_topic_in = argv[2]
parent_frame_id = argv[3]
children_frame_id = argv[4]
camera_info_topic_out = argv[5]
if len(argv) == 7:
camera_info_topic_in = argv[6]
else:
camera_info_topic_in = None
crwtf = CameraRepublisherWithTF(
image_topic_in=image_topic_in,
image_topic_out=image_topic_out,
parent_frame_id=parent_frame_id,
children_frame_id=children_frame_id,
camera_info_topic_out=camera_info_topic_out,
camera_info_topic_in=camera_info_topic_in)
crwtf.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment