Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Last active November 4, 2022 01:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awesomebytes/920e3d3a78982356f95df75f68121ee4 to your computer and use it in GitHub Desktop.
Save awesomebytes/920e3d3a78982356f95df75f68121ee4 to your computer and use it in GitHub Desktop.
Publish a default camera_info topic from an image topic
#!/usr/bin/env python
import sys
import numpy as np
import cv2
import rospy
from cv_bridge import CvBridge, CvBridgeError
from sensor_msgs.msg import CameraInfo, Image, CompressedImage
"""
Publish a default CameraInfo topic from an Image or CompressedImage topic
at a specified rate, optionally, provide fx and fy.
./default_camera_info_from_image_topic.py /my_camera/image_raw/compressed 30
Author: Sammy Pfeiffer <sammypfeiffer at gmail.com>
"""
class DefaultCameraInfo(object):
def __init__(self, image_topic, rate, fx=None, fy=None):
rospy.loginfo("Initializing DefaultCameraInfo, waiting for image on topic {}".format(image_topic))
self.bridge = CvBridge()
# Get an image from the topic
if image_topic.endswith('compressed'):
img_compressed = rospy.wait_for_message(image_topic, CompressedImage)
np_arr = np.fromstring(img_compressed.data, np.uint8)
img_cv2 = cv2.imdecode(np_arr, cv2.IMREAD_UNCHANGED)
img = self.bridge.cv2_to_imgmsg(img_cv2)
img.header = img_compressed.header
image_topic = image_topic.replace('/compressed', '')
else:
img = rospy.wait_for_message(image_topic, Image)
camera_info_topic = '/'.join(image_topic.split('/')[:-1]) + '/camera_info'
rospy.loginfo("Got an image, creating default CameraInfo message")
self.camera_info = self.create_default_camera_info(img, fx, fy)
rospy.loginfo("Created: {}".format(self.camera_info))
self.pub = rospy.Publisher(camera_info_topic, CameraInfo, queue_size=1)
# Most applications want synchronized camera_info's with the images
# having an image subscriber just for this is an abuse of resources
# so instead we just publish at the provided fixed rate
r = rospy.Rate(rate)
while not rospy.is_shutdown():
self.pub.publish(self.camera_info)
r.sleep()
def create_default_camera_info(self, img, fx, fy):
if fx is None:
fx = img.height / 2.0
if fy is None:
fy = img.width / 2.0
cx = img.width / 2.0
cy = img.height / 2.0
ci = CameraInfo()
ci.header.frame_id = img.header.frame_id
ci.height = img.height
ci.width = img.width
ci.distortion_model = 'plumb_bomb'
ci.D = [0.0] * 5
ci.K = [fx, 0.0, cx,
0.0, fy, cy,
0.0, 0.0, 1.0]
ci.R = [1.0, 0.0, 0.0,
0.0, 1.0, 0.0,
0.0, 0.0, 1.0]
ci.P = [fx, 0.0, cx, 0.0,
0.0, fy, cy, 0.0,
0.0, 0.0, 1.0, 0.0]
return ci
if __name__ == '__main__':
args = rospy.myargv(sys.argv)
if len(args) < 2:
print("Usage:")
print(args[0] + "<Image/CompressedImage topic> <Hz of publication> [fx fy]")
print("\nE.g.:")
print(args[0] + " /my_camera/image_raw 30")
print(args[0] + " /my_camera/image_raw/compressed 60")
print(args[0] + " /my_camera/image_raw/compressed 60 ")
exit(1)
image_topic = args[1]
rate = int(args[2])
if len(args) == 5:
fx = float(args[3])
fy = float(args[4])
else:
fx = fy = None
node_name = 'default_camera_info_' + image_topic.replace('/', '_')
rospy.init_node(node_name)
dci = DefaultCameraInfo(image_topic, rate, fx, fy)
rospy.spin()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment