Last active
November 4, 2022 01:43
-
-
Save awesomebytes/920e3d3a78982356f95df75f68121ee4 to your computer and use it in GitHub Desktop.
Publish a default camera_info topic from an image topic
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import sys | |
import numpy as np | |
import cv2 | |
import rospy | |
from cv_bridge import CvBridge, CvBridgeError | |
from sensor_msgs.msg import CameraInfo, Image, CompressedImage | |
""" | |
Publish a default CameraInfo topic from an Image or CompressedImage topic | |
at a specified rate, optionally, provide fx and fy. | |
./default_camera_info_from_image_topic.py /my_camera/image_raw/compressed 30 | |
Author: Sammy Pfeiffer <sammypfeiffer at gmail.com> | |
""" | |
class DefaultCameraInfo(object): | |
def __init__(self, image_topic, rate, fx=None, fy=None): | |
rospy.loginfo("Initializing DefaultCameraInfo, waiting for image on topic {}".format(image_topic)) | |
self.bridge = CvBridge() | |
# Get an image from the topic | |
if image_topic.endswith('compressed'): | |
img_compressed = rospy.wait_for_message(image_topic, CompressedImage) | |
np_arr = np.fromstring(img_compressed.data, np.uint8) | |
img_cv2 = cv2.imdecode(np_arr, cv2.IMREAD_UNCHANGED) | |
img = self.bridge.cv2_to_imgmsg(img_cv2) | |
img.header = img_compressed.header | |
image_topic = image_topic.replace('/compressed', '') | |
else: | |
img = rospy.wait_for_message(image_topic, Image) | |
camera_info_topic = '/'.join(image_topic.split('/')[:-1]) + '/camera_info' | |
rospy.loginfo("Got an image, creating default CameraInfo message") | |
self.camera_info = self.create_default_camera_info(img, fx, fy) | |
rospy.loginfo("Created: {}".format(self.camera_info)) | |
self.pub = rospy.Publisher(camera_info_topic, CameraInfo, queue_size=1) | |
# Most applications want synchronized camera_info's with the images | |
# having an image subscriber just for this is an abuse of resources | |
# so instead we just publish at the provided fixed rate | |
r = rospy.Rate(rate) | |
while not rospy.is_shutdown(): | |
self.pub.publish(self.camera_info) | |
r.sleep() | |
def create_default_camera_info(self, img, fx, fy): | |
if fx is None: | |
fx = img.height / 2.0 | |
if fy is None: | |
fy = img.width / 2.0 | |
cx = img.width / 2.0 | |
cy = img.height / 2.0 | |
ci = CameraInfo() | |
ci.header.frame_id = img.header.frame_id | |
ci.height = img.height | |
ci.width = img.width | |
ci.distortion_model = 'plumb_bomb' | |
ci.D = [0.0] * 5 | |
ci.K = [fx, 0.0, cx, | |
0.0, fy, cy, | |
0.0, 0.0, 1.0] | |
ci.R = [1.0, 0.0, 0.0, | |
0.0, 1.0, 0.0, | |
0.0, 0.0, 1.0] | |
ci.P = [fx, 0.0, cx, 0.0, | |
0.0, fy, cy, 0.0, | |
0.0, 0.0, 1.0, 0.0] | |
return ci | |
if __name__ == '__main__': | |
args = rospy.myargv(sys.argv) | |
if len(args) < 2: | |
print("Usage:") | |
print(args[0] + "<Image/CompressedImage topic> <Hz of publication> [fx fy]") | |
print("\nE.g.:") | |
print(args[0] + " /my_camera/image_raw 30") | |
print(args[0] + " /my_camera/image_raw/compressed 60") | |
print(args[0] + " /my_camera/image_raw/compressed 60 ") | |
exit(1) | |
image_topic = args[1] | |
rate = int(args[2]) | |
if len(args) == 5: | |
fx = float(args[3]) | |
fy = float(args[4]) | |
else: | |
fx = fy = None | |
node_name = 'default_camera_info_' + image_topic.replace('/', '_') | |
rospy.init_node(node_name) | |
dci = DefaultCameraInfo(image_topic, rate, fx, fy) | |
rospy.spin() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment