-
-
Save awesomebytes/958a5ef9e63821a28dc05775840c34d9 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python | |
import numpy as np | |
import cv2 | |
from cv_bridge import CvBridge, CvBridgeError | |
""" | |
ROS ImageTools, a class that contains methods to transform | |
from & to ROS Image, ROS CompressedImage & numpy.ndarray (cv2 image). | |
Also deals with Depth images, with a tricky catch, as they are compressed in | |
PNG, and we are here hardcoding to compression level 3 and the default | |
quantizations of the plugin. (What we use in our robots). | |
Meanwhile image_transport has no Python interface, this is the best we can do. | |
Author: Sammy Pfeiffer <Sammy.Pfeiffer at student.uts.edu.au> | |
""" | |
class ImageTools(object): | |
def __init__(self): | |
self._cv_bridge = CvBridge() | |
def convert_ros_msg_to_cv2(self, ros_data, image_encoding='bgr8'): | |
""" | |
Convert from a ROS Image message to a cv2 image. | |
""" | |
try: | |
return self._cv_bridge.imgmsg_to_cv2(ros_data, image_encoding) | |
except CvBridgeError as e: | |
if "[16UC1] is not a color format" in str(e): | |
raise CvBridgeError( | |
"You may be trying to use a Image method " + | |
"(Subscriber, Publisher, conversion) on a depth image" + | |
" message. Original exception: " + str(e)) | |
raise e | |
def convert_ros_compressed_to_cv2(self, compressed_msg): | |
np_arr = np.fromstring(compressed_msg.data, np.uint8) | |
return cv2.imdecode(np_arr, cv2.IMREAD_COLOR) | |
def convert_ros_compressed_msg_to_ros_msg(self, compressed_msg, | |
encoding='bgr8'): | |
cv2_img = self.convert_ros_compressed_to_cv2(compressed_msg) | |
ros_img = self._cv_bridge.cv2_to_imgmsg(cv2_img, encoding=encoding) | |
ros_img.header = compressed_msg.header | |
return ros_img | |
def convert_cv2_to_ros_msg(self, cv2_data, image_encoding='bgr8'): | |
""" | |
Convert from a cv2 image to a ROS Image message. | |
""" | |
return self._cv_bridge.cv2_to_imgmsg(cv2_data, image_encoding) | |
def convert_cv2_to_ros_compressed_msg(self, cv2_data, | |
compressed_format='jpg'): | |
""" | |
Convert from cv2 image to ROS CompressedImage. | |
""" | |
return self._cv_bridge.cv2_to_compressed_imgmsg(cv2_data, | |
dst_format=compressed_format) | |
def convert_ros_msg_to_ros_compressed_msg(self, image, | |
image_encoding='bgr8', | |
compressed_format="jpg"): | |
""" | |
Convert from ROS Image message to ROS CompressedImage. | |
""" | |
cv2_img = self.convert_ros_msg_to_cv2(image, image_encoding) | |
cimg_msg = self._cv_bridge.cv2_to_compressed_imgmsg(cv2_img, | |
dst_format=compressed_format) | |
cimg_msg.header = image.header | |
return cimg_msg | |
def convert_to_cv2(self, image): | |
""" | |
Convert any kind of image to cv2. | |
""" | |
cv2_img = None | |
if type(image) == np.ndarray: | |
cv2_img = image | |
elif image._type == 'sensor_msgs/Image': | |
cv2_img = self.convert_ros_msg_to_cv2(image) | |
elif image._type == 'sensor_msgs/CompressedImage': | |
cv2_img = self.convert_ros_compressed_to_cv2(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return cv2_img | |
def convert_to_ros_msg(self, image): | |
""" | |
Convert any kind of image to ROS Image. | |
""" | |
ros_msg = None | |
if type(image) == np.ndarray: | |
ros_msg = self.convert_cv2_to_ros_msg(image) | |
elif image._type == 'sensor_msgs/Image': | |
ros_msg = image | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_msg = self.convert_ros_compressed_msg_to_ros_msg(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_msg | |
def convert_to_ros_compressed_msg(self, image, compressed_format='jpg'): | |
""" | |
Convert any kind of image to ROS Compressed Image. | |
""" | |
ros_cmp = None | |
if type(image) == np.ndarray: | |
ros_cmp = self.convert_cv2_to_ros_compressed_msg( | |
image, compressed_format=compressed_format) | |
elif image._type == 'sensor_msgs/Image': | |
ros_cmp = self.convert_ros_msg_to_ros_compressed_msg( | |
image, compressed_format=compressed_format) | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_cmp = image | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_cmp | |
def convert_depth_to_ros_msg(self, image): | |
ros_msg = None | |
if type(image) == np.ndarray: | |
ros_msg = self.convert_cv2_to_ros_msg(image, | |
image_encoding='mono16') | |
elif image._type == 'sensor_msgs/Image': | |
image.encoding = '16UC1' | |
ros_msg = image | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_msg = self.convert_compressedDepth_to_image_msg(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_msg | |
def convert_depth_to_ros_compressed_msg(self, image): | |
ros_cmp = None | |
if type(image) == np.ndarray: | |
ros_cmp = self.convert_cv2_to_ros_compressed_msg(image, | |
compressed_format='png') | |
ros_cmp.format = '16UC1; compressedDepth' | |
# This is a header ROS depth CompressedImage have, necessary | |
# for viewer tools to see the image | |
# extracted from a real image from a robot | |
# The code that does it in C++ is this: | |
# https://github.com/ros-perception/image_transport_plugins/blob/indigo-devel/compressed_depth_image_transport/src/codec.cpp | |
ros_cmp.data = "\x00\x00\x00\x00\x88\x9c\x5c\xaa\x00\x40\x4b\xb7" + ros_cmp.data | |
elif image._type == 'sensor_msgs/Image': | |
image.encoding = "mono16" | |
ros_cmp = self.convert_ros_msg_to_ros_compressed_msg( | |
image, | |
image_encoding='mono16', | |
compressed_format='png') | |
ros_cmp.format = '16UC1; compressedDepth' | |
ros_cmp.data = "\x00\x00\x00\x00\x88\x9c\x5c\xaa\x00\x40\x4b\xb7" + ros_cmp.data | |
elif image._type == 'sensor_msgs/CompressedImage': | |
ros_cmp = image | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return ros_cmp | |
def convert_depth_to_cv2(self, image): | |
cv2_img = None | |
if type(image) == np.ndarray: | |
cv2_img = image | |
elif image._type == 'sensor_msgs/Image': | |
image.encoding = 'mono16' | |
cv2_img = self.convert_ros_msg_to_cv2(image, | |
image_encoding='mono16') | |
elif image._type == 'sensor_msgs/CompressedImage': | |
cv2_img = self.convert_compressedDepth_to_cv2(image) | |
else: | |
raise TypeError("Cannot convert type: " + str(type(image))) | |
return cv2_img | |
def convert_compressedDepth_to_image_msg(self, compressed_image): | |
""" | |
Convert a compressedDepth topic image into a ROS Image message. | |
compressed_image must be from a topic /bla/compressedDepth | |
as it's encoded in PNG | |
Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/ | |
""" | |
depth_img_raw = self.convert_compressedDepth_to_cv2(compressed_image) | |
img_msg = self._cv_bridge.cv2_to_imgmsg(depth_img_raw, "mono16") | |
img_msg.header = compressed_image.header | |
img_msg.encoding = "16UC1" | |
return img_msg | |
def convert_compressedDepth_to_cv2(self, compressed_depth): | |
""" | |
Convert a compressedDepth topic image into a cv2 image. | |
compressed_depth must be from a topic /bla/compressedDepth | |
as it's encoded in PNG | |
Code from: https://answers.ros.org/question/249775/display-compresseddepth-image-python-cv2/ | |
""" | |
depth_fmt, compr_type = compressed_depth.format.split(';') | |
# remove white space | |
depth_fmt = depth_fmt.strip() | |
compr_type = compr_type.strip() | |
if compr_type != "compressedDepth": | |
raise Exception("Compression type is not 'compressedDepth'." | |
"You probably subscribed to the wrong topic.") | |
# remove header from raw data, if necessary | |
if 'PNG' in compressed_depth.data[:12]: | |
# If we compressed it with opencv, there is nothing to strip | |
depth_header_size = 0 | |
else: | |
# If it comes from a robot/sensor, it has 12 useless bytes apparently | |
depth_header_size = 12 | |
raw_data = compressed_depth.data[depth_header_size:] | |
depth_img_raw = cv2.imdecode(np.frombuffer(raw_data, np.uint8), | |
# the cv2.CV_LOAD_IMAGE_UNCHANGED has been removed | |
-1) # cv2.CV_LOAD_IMAGE_UNCHANGED) | |
if depth_img_raw is None: | |
# probably wrong header size | |
raise Exception("Could not decode compressed depth image." | |
"You may need to change 'depth_header_size'!") | |
return depth_img_raw | |
def display_image(self, image): | |
""" | |
Use cv2 to show an image. | |
""" | |
cv2_img = self.convert_to_cv2(image) | |
window_name = 'show_image press q to exit' | |
cv2.imshow(window_name, cv2_img) | |
# TODO: Figure out how to check if the window | |
# was closed... when a user does it, the program is stuck | |
key = cv2.waitKey(0) | |
if chr(key) == 'q': | |
cv2.destroyWindow(window_name) | |
def save_image(self, image, filename): | |
""" | |
Given an image in numpy array or ROS format | |
save it using cv2 to the filename. The extension | |
declares the type of image (e.g. .jpg .png). | |
""" | |
cv2_img = self.convert_to_cv2(image) | |
cv2.imwrite(filename, cv2_img) | |
def save_depth_image(self, image, filename): | |
""" | |
Save a normalized (easier to visualize) version | |
of a depth image into a file. | |
""" | |
# Duc's smart undocummented code | |
im_array = self.convert_depth_to_cv2(image) | |
min_distance, max_distance = np.min(im_array), np.max(im_array) | |
im_array = im_array * 1.0 | |
im_array = (im_array < max_distance) * im_array | |
im_array = (im_array - min_distance) / max_distance * 255.0 | |
im_array = (im_array >= 0) * im_array | |
cv2.imwrite(filename, im_array) | |
def load_from_file(self, file_path, cv2_imread_mode=None): | |
""" | |
Load image from a file. | |
:param file_path str: Path to the image file. | |
:param cv2_imread_mode int: cv2.IMREAD_ mode, modes are: | |
cv2.IMREAD_ANYCOLOR 4 cv2.IMREAD_REDUCED_COLOR_4 33 | |
cv2.IMREAD_ANYDEPTH 2 cv2.IMREAD_REDUCED_COLOR_8 65 | |
cv2.IMREAD_COLOR 1 cv2.IMREAD_REDUCED_GRAYSCALE_2 16 | |
cv2.IMREAD_GRAYSCALE 0 cv2.IMREAD_REDUCED_GRAYSCALE_4 32 | |
cv2.IMREAD_IGNORE_ORIENTATION 128 cv2.IMREAD_REDUCED_GRAYSCALE_8 64 | |
cv2.IMREAD_LOAD_GDAL 8 cv2.IMREAD_UNCHANGED -1 | |
cv2.IMREAD_REDUCED_COLOR_2 17 | |
""" | |
if cv2_imread_mode is not None: | |
img = cv2.imread(file_path, cv2_imread_mode) | |
img = cv2.imread(file_path) | |
if img is None: | |
raise RuntimeError("No image found to load at " + str(file_path)) | |
return img | |
if __name__ == '__main__': | |
from sensor_msgs.msg import Image, CompressedImage | |
it = ImageTools() | |
from cPickle import load | |
img = load(open('rgb_image.pickle', 'r')) | |
cv2_img = it.convert_ros_msg_to_cv2(img) | |
print(type(cv2_img)) | |
ros_img = it.convert_cv2_to_ros_msg(cv2_img) | |
print(type(ros_img)) | |
ros_compressed2 = it.convert_cv2_to_ros_compressed_msg(cv2_img) | |
print(type(ros_compressed2)) | |
compressed_ros_img = it.convert_ros_msg_to_ros_compressed_msg(ros_img) | |
print(type(compressed_ros_img)) | |
ros_img2 = it.convert_ros_compressed_msg_to_ros_msg(compressed_ros_img) | |
print(type(ros_img2)) | |
cv2_2 = it.convert_ros_compressed_to_cv2(compressed_ros_img) | |
print(type(cv2_2)) | |
cv2_3 = it.convert_to_cv2(cv2_img) | |
cv2_4 = it.convert_to_cv2(ros_img) | |
cv2_5 = it.convert_to_cv2(compressed_ros_img) | |
ros_3 = it.convert_to_ros_msg(cv2_img) | |
ros_4 = it.convert_to_ros_msg(ros_img) | |
ros_5 = it.convert_to_ros_msg(compressed_ros_img) | |
rosc_3 = it.convert_to_ros_compressed_msg(cv2_img) | |
rosc_4 = it.convert_to_ros_compressed_msg(ros_img) | |
rosc_5 = it.convert_to_ros_compressed_msg(compressed_ros_img) | |
depthcompimg = load(open('depth_compressed_image.pickle', 'r')) | |
depth_cv2 = it.convert_depth_to_cv2(depthcompimg) | |
depth_ros = it.convert_depth_to_ros_msg(depthcompimg) | |
depth_ros_comp = it.convert_depth_to_ros_compressed_msg(depthcompimg) | |
it.save_image(depth_cv2, 'depth_comp_cv2.jpg') | |
it.save_depth_image(depth_cv2, 'depth_comp_normalized_cv2.jpg') | |
depthimg = load(open('depth_image.pickle', 'r')) | |
depthimg_cv2 = it.convert_depth_to_cv2(depthimg) | |
depthimg_ros = it.convert_depth_to_ros_msg(depthimg) | |
depthimg_ros_comp = it.convert_depth_to_ros_compressed_msg(depthimg) | |
it.save_image(depth_cv2, 'depth_cv2.jpg') | |
it.save_depth_image(depth_cv2, 'depth_normalized_cv2.jpg') | |
it.save_image(cv2_img, 'cv2_image.jpg') | |
it.save_image(ros_img, 'ros_image.jpg') | |
it.save_image(compressed_ros_img, 'compressed_ros_image.jpg') | |
it.display_image(cv2_img) |
Hey @lucasw , I don't have 32FC1 sorry. The robot that required this work only published in 16UC1.
I think I figured it out here https://answers.ros.org/question/407659/how-to-generate-format-32fc1-compresseddepth-png-image-in-python-from-float-array/ - and it turned out it isn't actually 32fc1 despite the format name, just uint16 with a scale and offset (and the depth values get inverted)
That's an odd format! Thanks for figuring it out. I'm going to quote here your reply there just for the sake of conservation of information:
Here is what I came up with- I haven't yet checked to make sure the depth values when decompressed are correct, they just seemed reasonable as seen in rqt:
import cv2
import numpy as np
from sensor_msgs.msg import CompressedImage
def encode_compressed_depth_image_msg(depth_image: np.array,
depth_min=1.0, depth_max=10.0,
depth_quantization=100.0):
depth_image[depth_image > depth_max] = np.nan
depth_image[depth_image < depth_min] = np.nan
depth_z0 = depth_quantization
depth_quant_a = depth_z0 * (depth_z0 + 1.0)
depth_quant_b = 1.0 - depth_quant_a / depth_max
inv_depth = (depth_quant_a / depth_image + depth_quant_b).astype(np.uint16)
depth_encoded = cv2.imencode(".png", inv_depth)[1]
header = np.array([0.0, depth_quant_a, depth_quant_b], np.float32)
compressed_depth_msg = CompressedImage()
compressed_depth_msg.format = "32FC1; compressedDepth png"
compressed_depth_msg.data = header.tobytes() + depth_encoded.tobytes()
return compressed_depth_msg
The payload of the CompressedDepth 32FC1; compressedDepth png
is 12 header bytes which contains the conversion scaling numbers quant a & b, then an encoded 16-bit grayscale png follows (if you lop off those first 12 bytes and save the rest of the bytes to a file you can display it in a png viewer and see the depth values). The scaling numbers fixed above but could be changed per-frame if there was a reason to, by manipulating depth_max and depth_quantization (or refactor so depth_quant_a and b are set through some other means).
@awesomebytes thank you very much for publishing these conversion tools. For 2D RGB images, they work really well.
@Mechazo11 You are welcome! I'm very happy to know my snippets help people! :)
Do you have 32FC1 conversion functions?
32FC1; compressedDepth png
is of particular interest.