Skip to content

Instantly share code, notes, and snippets.

@awesomebytes
Created November 12, 2018 05:49
Show Gist options
  • Star 14 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save awesomebytes/958a5ef9e63821a28dc05775840c34d9 to your computer and use it in GitHub Desktop.
Save awesomebytes/958a5ef9e63821a28dc05775840c34d9 to your computer and use it in GitHub Desktop.
ImageTools is a class to deal with conversions from & to ROS Image, ROS CompressedImage and numpy.ndarray (cv2 image).
#!/usr/bin/env python
import numpy as np
import cv2
from cv_bridge import CvBridge, CvBridgeError
"""
ROS ImageTools, a class that contains methods to transform
from & to ROS Image, ROS CompressedImage & numpy.ndarray (cv2 image).
Also deals with Depth images, with a tricky catch, as they are compressed in
PNG, and we are here hardcoding to compression level 3 and the default
quantizations of the plugin. (What we use in our robots).
Meanwhile image_transport has no Python interface, this is the best we can do.
Author: Sammy Pfeiffer <Sammy.Pfeiffer at student.uts.edu.au>
"""
class ImageTools(object):
def __init__(self):
self._cv_bridge = CvBridge()
def convert_ros_msg_to_cv2(self, ros_data, image_encoding='bgr8'):
"""
Convert from a ROS Image message to a cv2 image.
"""
try:
return self._cv_bridge.imgmsg_to_cv2(ros_data, image_encoding)
except CvBridgeError as e:
if "[16UC1] is not a color format" in str(e):
raise CvBridgeError(
"You may be trying to use a Image method " +
"(Subscriber, Publisher, conversion) on a depth image" +
" message. Original exception: " + str(e))
raise e
def convert_ros_compressed_to_cv2(self, compressed_msg):
np_arr = np.fromstring(compressed_msg.data, np.uint8)
return cv2.imdecode(np_arr, cv2.IMREAD_COLOR)
def convert_ros_compressed_msg_to_ros_msg(self, compressed_msg,
encoding='bgr8'):
cv2_img = self.convert_ros_compressed_to_cv2(compressed_msg)
ros_img = self._cv_bridge.cv2_to_imgmsg(cv2_img, encoding=encoding)
ros_img.header = compressed_msg.header
return ros_img
def convert_cv2_to_ros_msg(self, cv2_data, image_encoding='bgr8'):
"""
Convert from a cv2 image to a ROS Image message.
"""
return self._cv_bridge.cv2_to_imgmsg(cv2_data, image_encoding)
def convert_cv2_to_ros_compressed_msg(self, cv2_data,
compressed_format='jpg'):
"""
Convert from cv2 image to ROS CompressedImage.
"""
return self._cv_bridge.cv2_to_compressed_imgmsg(cv2_data,
dst_format=compressed_format)
def convert_ros_msg_to_ros_compressed_msg(self, image,
image_encoding='bgr8',
compressed_format="jpg"):
"""
Convert from ROS Image message to ROS CompressedImage.
"""
cv2_img = self.convert_ros_msg_to_cv2(image, image_encoding)
cimg_msg = self._cv_bridge.cv2_to_compressed_imgmsg(cv2_img,
dst_format=compressed_format)
cimg_msg.header = image.header
return cimg_msg
def convert_to_cv2(self, image):
"""
Convert any kind of image to cv2.
"""
cv2_img = None
if type(image) == np.ndarray:
cv2_img = image
elif image._type == 'sensor_msgs/Image':
cv2_img = self.convert_ros_msg_to_cv2(image)
elif image._type == 'sensor_msgs/CompressedImage':
cv2_img = self.convert_ros_compressed_to_cv2(image)
else:
raise TypeError("Cannot convert type: " + str(type(image)))
return cv2_img
def convert_to_ros_msg(self, image):
"""
Convert any kind of image to ROS Image.
"""
ros_msg = None
if type(image) == np.ndarray:
ros_msg = self.convert_cv2_to_ros_msg(image)
elif image._type == 'sensor_msgs/Image':
ros_msg = image
elif image._type == 'sensor_msgs/CompressedImage':
ros_msg = self.convert_ros_compressed_msg_to_ros_msg(image)
else:
raise TypeError("Cannot convert type: " + str(type(image)))
return ros_msg
def convert_to_ros_compressed_msg(self, image, compressed_format='jpg'):
"""
Convert any kind of image to ROS Compressed Image.
"""
ros_cmp = None
if type(image) == np.ndarray:
ros_cmp = self.convert_cv2_to_ros_compressed_msg(
image, compressed_format=compressed_format)
elif image._type == 'sensor_msgs/Image':
ros_cmp = self.convert_ros_msg_to_ros_compressed_msg(
image, compressed_format=compressed_format)
elif image._type == 'sensor_msgs/CompressedImage':
ros_cmp = image
else:
raise TypeError("Cannot convert type: " + str(type(image)))
return ros_cmp
def convert_depth_to_ros_msg(self, image):
ros_msg = None
if type(image) == np.ndarray:
ros_msg = self.convert_cv2_to_ros_msg(image,
image_encoding='mono16')
elif image._type == 'sensor_msgs/Image':
image.encoding = '16UC1'
ros_msg = image
elif image._type == 'sensor_msgs/CompressedImage':
ros_msg = self.convert_compressedDepth_to_image_msg(image)
else:
raise TypeError("Cannot convert type: " + str(type(image)))
return ros_msg
def convert_depth_to_ros_compressed_msg(self, image):
ros_cmp = None
if type(image) == np.ndarray:
ros_cmp = self.convert_cv2_to_ros_compressed_msg(image,
compressed_format='png')
ros_cmp.format = '16UC1; compressedDepth'
# This is a header ROS depth CompressedImage have, necessary
# for viewer tools to see the image
# extracted from a real image from a robot
# The code that does it in C++ is this:
# https://github.com/ros-perception/image_transport_plugins/blob/indigo-devel/compressed_depth_image_transport/src/codec.cpp
ros_cmp.data = "\x00\x00\x00\x00\x88\x9c\x5c\xaa\x00\x40\x4b\xb7" + ros_cmp.data
elif image._type == 'sensor_msgs/Image':
image.encoding = "mono16"
ros_cmp = self.convert_ros_msg_to_ros_compressed_msg(
image,
image_encoding='mono16',
compressed_format='png')
ros_cmp.format = '16UC1; compressedDepth'
ros_cmp.data = "\x00\x00\x00\x00\x88\x9c\x5c\xaa\x00\x40\x4b\xb7" + ros_cmp.data
elif image._type == 'sensor_msgs/CompressedImage':
ros_cmp = image
else:
raise TypeError("Cannot convert type: " + str(type(image)))
return ros_cmp
def convert_depth_to_cv2(self, image):
cv2_img = None
if type(image) == np.ndarray:
cv2_img = image
elif image._type == 'sensor_msgs/Image':
image.encoding = 'mono16'
cv2_img = self.convert_ros_msg_to_cv2(image,
image_encoding='mono16')
elif image._type == 'sensor_msgs/CompressedImage':
cv2_img = self.convert_compressedDepth_to_cv2(image)
else:
raise TypeError("Cannot convert type: " + str(type(image)))
return cv2_img
def convert_compressedDepth_to_image_msg(self, compressed_image):
"""
Convert a compressedDepth topic image into a ROS Image message.
compressed_image must be from a topic /bla/compressedDepth
as it's encoded in PNG
Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/
"""
depth_img_raw = self.convert_compressedDepth_to_cv2(compressed_image)
img_msg = self._cv_bridge.cv2_to_imgmsg(depth_img_raw, "mono16")
img_msg.header = compressed_image.header
img_msg.encoding = "16UC1"
return img_msg
def convert_compressedDepth_to_cv2(self, compressed_depth):
"""
Convert a compressedDepth topic image into a cv2 image.
compressed_depth must be from a topic /bla/compressedDepth
as it's encoded in PNG
Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/
"""
depth_fmt, compr_type = compressed_depth.format.split(';')
# remove white space
depth_fmt = depth_fmt.strip()
compr_type = compr_type.strip()
if compr_type != "compressedDepth":
raise Exception("Compression type is not 'compressedDepth'."
"You probably subscribed to the wrong topic.")
# remove header from raw data, if necessary
if 'PNG' in compressed_depth.data[:12]:
# If we compressed it with opencv, there is nothing to strip
depth_header_size = 0
else:
# If it comes from a robot/sensor, it has 12 useless bytes apparently
depth_header_size = 12
raw_data = compressed_depth.data[depth_header_size:]
depth_img_raw = cv2.imdecode(np.frombuffer(raw_data, np.uint8),
# the cv2.CV_LOAD_IMAGE_UNCHANGED has been removed
-1) # cv2.CV_LOAD_IMAGE_UNCHANGED)
if depth_img_raw is None:
# probably wrong header size
raise Exception("Could not decode compressed depth image."
"You may need to change 'depth_header_size'!")
return depth_img_raw
def display_image(self, image):
"""
Use cv2 to show an image.
"""
cv2_img = self.convert_to_cv2(image)
window_name = 'show_image press q to exit'
cv2.imshow(window_name, cv2_img)
# TODO: Figure out how to check if the window
# was closed... when a user does it, the program is stuck
key = cv2.waitKey(0)
if chr(key) == 'q':
cv2.destroyWindow(window_name)
def save_image(self, image, filename):
"""
Given an image in numpy array or ROS format
save it using cv2 to the filename. The extension
declares the type of image (e.g. .jpg .png).
"""
cv2_img = self.convert_to_cv2(image)
cv2.imwrite(filename, cv2_img)
def save_depth_image(self, image, filename):
"""
Save a normalized (easier to visualize) version
of a depth image into a file.
"""
# Duc's smart undocummented code
im_array = self.convert_depth_to_cv2(image)
min_distance, max_distance = np.min(im_array), np.max(im_array)
im_array = im_array * 1.0
im_array = (im_array < max_distance) * im_array
im_array = (im_array - min_distance) / max_distance * 255.0
im_array = (im_array >= 0) * im_array
cv2.imwrite(filename, im_array)
def load_from_file(self, file_path, cv2_imread_mode=None):
"""
Load image from a file.
:param file_path str: Path to the image file.
:param cv2_imread_mode int: cv2.IMREAD_ mode, modes are:
cv2.IMREAD_ANYCOLOR 4 cv2.IMREAD_REDUCED_COLOR_4 33
cv2.IMREAD_ANYDEPTH 2 cv2.IMREAD_REDUCED_COLOR_8 65
cv2.IMREAD_COLOR 1 cv2.IMREAD_REDUCED_GRAYSCALE_2 16
cv2.IMREAD_GRAYSCALE 0 cv2.IMREAD_REDUCED_GRAYSCALE_4 32
cv2.IMREAD_IGNORE_ORIENTATION 128 cv2.IMREAD_REDUCED_GRAYSCALE_8 64
cv2.IMREAD_LOAD_GDAL 8 cv2.IMREAD_UNCHANGED -1
cv2.IMREAD_REDUCED_COLOR_2 17
"""
if cv2_imread_mode is not None:
img = cv2.imread(file_path, cv2_imread_mode)
img = cv2.imread(file_path)
if img is None:
raise RuntimeError("No image found to load at " + str(file_path))
return img
if __name__ == '__main__':
from sensor_msgs.msg import Image, CompressedImage
it = ImageTools()
from cPickle import load
img = load(open('rgb_image.pickle', 'r'))
cv2_img = it.convert_ros_msg_to_cv2(img)
print(type(cv2_img))
ros_img = it.convert_cv2_to_ros_msg(cv2_img)
print(type(ros_img))
ros_compressed2 = it.convert_cv2_to_ros_compressed_msg(cv2_img)
print(type(ros_compressed2))
compressed_ros_img = it.convert_ros_msg_to_ros_compressed_msg(ros_img)
print(type(compressed_ros_img))
ros_img2 = it.convert_ros_compressed_msg_to_ros_msg(compressed_ros_img)
print(type(ros_img2))
cv2_2 = it.convert_ros_compressed_to_cv2(compressed_ros_img)
print(type(cv2_2))
cv2_3 = it.convert_to_cv2(cv2_img)
cv2_4 = it.convert_to_cv2(ros_img)
cv2_5 = it.convert_to_cv2(compressed_ros_img)
ros_3 = it.convert_to_ros_msg(cv2_img)
ros_4 = it.convert_to_ros_msg(ros_img)
ros_5 = it.convert_to_ros_msg(compressed_ros_img)
rosc_3 = it.convert_to_ros_compressed_msg(cv2_img)
rosc_4 = it.convert_to_ros_compressed_msg(ros_img)
rosc_5 = it.convert_to_ros_compressed_msg(compressed_ros_img)
depthcompimg = load(open('depth_compressed_image.pickle', 'r'))
depth_cv2 = it.convert_depth_to_cv2(depthcompimg)
depth_ros = it.convert_depth_to_ros_msg(depthcompimg)
depth_ros_comp = it.convert_depth_to_ros_compressed_msg(depthcompimg)
it.save_image(depth_cv2, 'depth_comp_cv2.jpg')
it.save_depth_image(depth_cv2, 'depth_comp_normalized_cv2.jpg')
depthimg = load(open('depth_image.pickle', 'r'))
depthimg_cv2 = it.convert_depth_to_cv2(depthimg)
depthimg_ros = it.convert_depth_to_ros_msg(depthimg)
depthimg_ros_comp = it.convert_depth_to_ros_compressed_msg(depthimg)
it.save_image(depth_cv2, 'depth_cv2.jpg')
it.save_depth_image(depth_cv2, 'depth_normalized_cv2.jpg')
it.save_image(cv2_img, 'cv2_image.jpg')
it.save_image(ros_img, 'ros_image.jpg')
it.save_image(compressed_ros_img, 'compressed_ros_image.jpg')
it.display_image(cv2_img)
@lucasw
Copy link

lucasw commented Oct 9, 2022

I think I figured it out here https://answers.ros.org/question/407659/how-to-generate-format-32fc1-compresseddepth-png-image-in-python-from-float-array/ - and it turned out it isn't actually 32fc1 despite the format name, just uint16 with a scale and offset (and the depth values get inverted)

@awesomebytes
Copy link
Author

That's an odd format! Thanks for figuring it out. I'm going to quote here your reply there just for the sake of conservation of information:

Here is what I came up with- I haven't yet checked to make sure the depth values when decompressed are correct, they just seemed reasonable as seen in rqt:

import cv2
import numpy as np
from sensor_msgs.msg import CompressedImage


def encode_compressed_depth_image_msg(depth_image: np.array,
                                      depth_min=1.0, depth_max=10.0,
                                      depth_quantization=100.0):
    depth_image[depth_image > depth_max] = np.nan
    depth_image[depth_image < depth_min] = np.nan
    depth_z0 = depth_quantization
    depth_quant_a = depth_z0 * (depth_z0 + 1.0)
    depth_quant_b = 1.0 - depth_quant_a / depth_max
    inv_depth = (depth_quant_a / depth_image + depth_quant_b).astype(np.uint16)
    depth_encoded = cv2.imencode(".png", inv_depth)[1]
    header = np.array([0.0, depth_quant_a, depth_quant_b], np.float32)

    compressed_depth_msg = CompressedImage()
    compressed_depth_msg.format = "32FC1; compressedDepth png"
    compressed_depth_msg.data = header.tobytes() + depth_encoded.tobytes()
    return compressed_depth_msg

The payload of the CompressedDepth 32FC1; compressedDepth png is 12 header bytes which contains the conversion scaling numbers quant a & b, then an encoded 16-bit grayscale png follows (if you lop off those first 12 bytes and save the rest of the bytes to a file you can display it in a png viewer and see the depth values). The scaling numbers fixed above but could be changed per-frame if there was a reason to, by manipulating depth_max and depth_quantization (or refactor so depth_quant_a and b are set through some other means).

@Mechazo11
Copy link

@awesomebytes thank you very much for publishing these conversion tools. For 2D RGB images, they work really well.

@awesomebytes
Copy link
Author

@Mechazo11 You are welcome! I'm very happy to know my snippets help people! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment