Skip to content

Instantly share code, notes, and snippets.

@jensenb
Created November 4, 2013 14:35
Show Gist options
  • Save jensenb/7303362 to your computer and use it in GitHub Desktop.
Save jensenb/7303362 to your computer and use it in GitHub Desktop.
ROS Python code for publishing synchronized stereo image pairs together with camera information. The script expects that the images are properly prefixed and sequenced, i.e. Left_01.png, Left_02.png, and that image files with the same sequence number correspond to one another. Most of the parameters can be set over the command line, including th…
#!/usr/bin/env python
# simple script to push a series of stereo images in a loop
from docutils.nodes import paragraph
import cv2
import numpy as np
import rospy
from sensor_msgs.msg import CameraInfo, Image
import time
import yaml
#change these to fit the expected topic names
IMAGE_MESSAGE_TOPIC = 'image_color'
CAMERA_MESSAGE_TOPIC = 'cam_info'
def parse_calibration_yaml(calib_file):
with file(calib_file, 'r') as f:
params = yaml.load(f)
cam_info = CameraInfo()
cam_info.height = params['image_height']
cam_info.width = params['image_width']
cam_info.distortion_model = params['distortion_model']
cam_info.K = params['camera_matrix']['data']
cam_info.D = params['distortion_coefficients']['data']
cam_info.R = params['rectification_matrix']['data']
cam_info.P = params['projection_matrix']['data']
return cam_info
def publish_stereo_sequence(left_images, right_images, left_cam_info, right_cam_info, left_img_pub, right_img_pub,
left_cam_pub, right_cam_pub, rate=1.0):
#iterate over all stereo image pairs
for left_image, right_image in zip(left_images, right_images):
#use the current time as message time stamp for all messages
stamp = rospy.Time.from_sec(time.time())
left_cam_info.header.stamp = stamp
right_cam_info.header.stamp = stamp
#publish the camera info messages first
left_cam_pub.publish(left_cam_info)
right_cam_pub.publish(right_cam_info)
left_img_msg = Image()
left_img_msg.height = left_image.shape[0]
left_img_msg.width = left_image.shape[1]
left_img_msg.step = left_image.strides[0]
left_img_msg.encoding = 'bgr8'
left_img_msg.header.frame_id = 'image_rect'
left_img_msg.header.stamp = stamp
left_img_msg.data = left_image.flatten().tolist()
right_img_msg = Image()
right_img_msg.height = right_image.shape[0]
right_img_msg.width = right_image.shape[1]
right_img_msg.step = right_image.strides[0]
right_img_msg.encoding = 'bgr8'
right_img_msg.header.frame_id = 'image_rect'
right_img_msg.header.stamp = stamp
right_img_msg.data = right_image.flatten().tolist()
#publish the image pair
left_img_pub.publish(left_img_msg)
right_img_pub.publish(right_img_msg)
rospy.sleep(1.0/rate)
if __name__ == '__main__':
import argparse
import glob
parser = argparse.ArgumentParser(description='Publish a a sequence of stereo images.')
parser.add_argument('-l', help='The left image file glob pattern', metavar='left pattern', dest='left_pat',
default='left*')
parser.add_argument('-r', help='The right image file glob pattern', metavar='right pattern', dest='right_pat',
default='right*')
parser.add_argument('-cl', help='The left camera calibration parameters file', dest='left_calib',
metavar='calibration yaml file', required=True)
parser.add_argument('-cr', help='The right camera calibration parameters file', dest='right_calib',
metavar='calibraiton yaml file', required=True)
parser.add_argument('--left-topic', help='The topic name for the left camera', dest='left_topic',
metavar='topic name', default='left_camera')
parser.add_argument('--right-topic', help='The topic name for the right camera', dest='right_topic',
metavar='topic name', default='right_camera')
parser.add_argument('--rate', help='The publishing rate in HZ', dest='rate', default=1.0, type=float)
args = parser.parse_args()
#first we read in all the image files and store them in sorted lists
left_files = sorted(glob.glob(args.left_pat))
right_files = sorted(glob.glob(args.right_pat))
if len(left_files) != len(right_files):
print "Number of left files", len(left_files), "not equal to number of right files", len(right_files)
left_images = []
right_images = []
for left_file in left_files:
left_images.append(cv2.imread(left_file))
for right_file in right_files:
right_images.append(cv2.imread(right_file))
#then we parse the left and right camera calibration yaml files
left_cam_info = parse_calibration_yaml(args.left_calib)
right_cam_info = parse_calibration_yaml(args.right_calib)
#setup the publishers
left_img_pub = rospy.Publisher(args.left_topic + '/' + IMAGE_MESSAGE_TOPIC, Image)
left_cam_pub = rospy.Publisher(args.left_topic + '/' + CAMERA_MESSAGE_TOPIC, CameraInfo)
right_img_pub = rospy.Publisher(args.right_topic + '/' + IMAGE_MESSAGE_TOPIC, Image)
right_cam_pub = rospy.Publisher(args.right_topic + '/' + CAMERA_MESSAGE_TOPIC, CameraInfo)
rospy.init_node('stereo_pub')
#publish the synchronized messages until we are interrupted
try:
while not rospy.is_shutdown():
publish_stereo_sequence(left_images, right_images, left_cam_info, right_cam_info, left_img_pub,
right_img_pub, left_cam_pub, right_cam_pub, args.rate)
rospy.sleep(1.0/args.rate)
except rospy.ROSInterruptException:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment