Created
March 17, 2023 11:39
-
-
Save awesomebytes/1ff14904145631ab412a93bb9e910250 to your computer and use it in GitHub Desktop.
The output of experimenting with ChatGPT writing and helping to fix a node to project pointcloud points on an image
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import rospy | |
import sensor_msgs.point_cloud2 as pc2 | |
from sensor_msgs.msg import PointCloud2, CameraInfo, CompressedImage | |
from cv_bridge import CvBridge | |
import tf2_ros | |
import tf2_py as tf2 | |
import tf2_sensor_msgs | |
import cv2 | |
import numpy as np | |
import time | |
def intensity_to_rainbow_color_like_rviz(value, min_intensity, diff_intensity, invert=False): | |
""" | |
Transforms an intensity value to RGB colors like Rviz does | |
Code translated from C++ to Python via ChatGPT | |
https://github.com/ros-visualization/rviz/blob/noetic-devel/src/rviz/default_plugin/point_cloud_transformers.cpp#L45-L69 | |
""" | |
value = 1.0 - (value - min_intensity) / diff_intensity | |
if invert: | |
value = 1.0 - value | |
# this is HSV color palette with hue values going only from 0.0 to 0.833333. | |
value = min(value, 1.0) | |
value = max(value, 0.0) | |
h = value * 5.0 + 1.0 | |
i = int(h // 1) | |
f = h - i | |
if not (i % 2): | |
f = 1 - f # if i is even | |
n = 1 - f | |
color = [0, 0, 0] | |
if i <= 1: | |
color[0], color[1], color[2] = n, 0, 1 | |
elif i == 2: | |
color[0], color[1], color[2] = 0, n, 1 | |
elif i == 3: | |
color[0], color[1], color[2] = 0, 1, n | |
elif i == 4: | |
color[0], color[1], color[2] = n, 1, 0 | |
elif i >= 5: | |
color[0], color[1], color[2] = 1, n, 0 | |
# The following code expects values in between 0 - 255 | |
color[0] = int(color[0] * 255) | |
color[1] = int(color[1] * 255) | |
color[2] = int(color[2] * 255) | |
# return color[0], color[1], color[2] | |
# Image is BGR, so return bgr | |
return color[2], color[1], color[0] | |
class PointCloudProjector: | |
def __init__(self): | |
rospy.init_node('pointcloud_projector', anonymous=True) | |
self.bridge = CvBridge() | |
self.tf_buffer = tf2_ros.Buffer() | |
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer) | |
self.camera_info = None | |
self.image_msg = None | |
self.pointcloud = None | |
self.transform = None | |
self.pointcloud_sub = rospy.Subscriber('/velodyne_points', PointCloud2, self.pointcloud_callback, queue_size=1) | |
self.camera_info_sub = rospy.Subscriber('/realsense_middle/color/camera_info', CameraInfo, self.camera_info_callback, queue_size=1) | |
self.compressed_image_sub = rospy.Subscriber('/realsense_middle/color/image_raw/compressed', CompressedImage, self.compressed_image_callback, queue_size=1) | |
def pointcloud_callback(self, data): | |
self.pointcloud = data | |
def do_work(self): | |
if self.camera_info is None or self.image_msg is None or self.pointcloud is None: | |
return | |
data = self.pointcloud | |
t1 = time.time() | |
try: | |
# Only get the transform once (it's fixed) | |
if self.transform is None: | |
rospy.loginfo("Getting transform in between: {} and {}".format(self.camera_info.header.frame_id, data.header.frame_id)) | |
transform = self.tf_buffer.lookup_transform(self.camera_info.header.frame_id, data.header.frame_id, rospy.Time(0), rospy.Duration(1.0)) | |
rospy.loginfo("Got transform: {}".format(transform)) | |
self.transform = transform | |
self.tf_listener.unregister() | |
data_transformed = tf2_sensor_msgs.do_transform_cloud(data, self.transform) | |
except (tf2.LookupException, tf2.ExtrapolationException) as ex: | |
rospy.logerr('TF transform error: %s', ex) | |
return | |
points_with_intensity = pc2.read_points(data_transformed, field_names=("x", "y", "z", "intensity"), skip_nans=True) | |
# Filter points, we only want to show the points in front of the camera | |
points_with_intensity = [(x, y, z, intensity) for x, y, z, intensity in points_with_intensity if z > 1.0] | |
t2 = time.time() | |
points_projected, intensities = self.project_points_to_image(points_with_intensity) | |
t3 = time.time() | |
self.visualize_projected_points(points_projected, intensities) | |
t4 = time.time() | |
rospy.loginfo("\nTime to transform and read points: {}s".format(t2 - t1)) | |
rospy.loginfo("Time to project points to image: {}s".format(t3 - t2)) | |
rospy.loginfo("Time to visualize projected points: {}s".format(t4 - t3)) | |
rospy.loginfo("Total time: {}s".format(t4 - t1)) | |
def camera_info_callback(self, data): | |
self.camera_info = data | |
# Only need one | |
self.camera_info_sub.unregister() | |
def compressed_image_callback(self, data): | |
# Only store, convert to cv2 only when needed | |
self.image_msg = data | |
def project_points_to_image(self, points_with_intensity): | |
camera_matrix = np.array(self.camera_info.K).reshape(3, 3) | |
distortion_coeffs = np.array(self.camera_info.D) | |
points_3d = np.fromiter((x for p in points_with_intensity for x in p[:3]), dtype=float).reshape((-1, 3)) | |
intensities = np.fromiter((p[3] for p in points_with_intensity), dtype=np.uint8) | |
# Originally, iterating again over the same generator, which does not work | |
# provided an empty list of intensities | |
# intensities = np.array([point[3] for point in points_with_intensity]) # original code | |
# intensities_normalized = ((intensities - intensities.min()) / (intensities.max() - intensities.min()) * 255).astype(np.uint8) | |
points_projected, _ = cv2.projectPoints(points_3d, np.eye(3), np.zeros(3), camera_matrix, distortion_coeffs) | |
return points_projected.reshape(-1, 2).astype(np.int32), intensities #intensities_normalized | |
def visualize_projected_points(self, points_projected, intensities): | |
t5 = time.time() | |
try: | |
image = self.bridge.compressed_imgmsg_to_cv2(self.image_msg, desired_encoding='bgr8') | |
except Exception as ex: | |
rospy.logerr('Error converting compressed image to cv2: %s', ex) | |
# image = self.image.copy() # no need to copy, only used here | |
height, width, _ = image.shape | |
min_intensity = intensities.min() | |
max_intensity = intensities.max() | |
diff_intensity = max_intensity - min_intensity | |
if diff_intensity == 0: | |
diff_intensity = 1e20 # Something huge, so we get a 0 (and we dont divide by 0) | |
t6 = time.time() | |
count = 0 | |
count_else = 0 | |
for pt, intensity in zip(points_projected, intensities): | |
count += 1 | |
# Only paint the points that actually land in the image | |
x, y = pt | |
if 0 <= x < width and 0 <= y < height: | |
intensity = int(intensity) | |
# convert intensity to red-green-yellow like Rviz does | |
rgb = intensity_to_rainbow_color_like_rviz(intensity, min_intensity, max_intensity, invert=False) | |
#rgb = (1, 0, 0) | |
cv2.circle(image, tuple(pt), 1, rgb, -1) | |
else: | |
count_else += 1 | |
rospy.logwarn("# of points: {}".format(count)) | |
rospy.logwarn("# of points out of image: {}".format(count_else)) | |
t7 = time.time() | |
cv2.imshow('Projected Points', image) | |
cv2.waitKey(1) | |
t8 = time.time() | |
rospy.loginfo("Time to copy image and calc minmax intensity: {}s".format(t6-t5)) | |
rospy.loginfo("Time to loop: {}s".format(t7 - t6)) | |
rospy.loginfo("Time to show image: {}s".format(t8 - t7)) | |
def run(self): | |
rate = rospy.Rate(4) # Painting the circles takes 0.15s, transforming the points 0.06s, so in total, about 0.21s, we can't go faster than that | |
while not rospy.is_shutdown(): | |
self.do_work() | |
rate.sleep() | |
if __name__ == '__main__': | |
try: | |
projector = PointCloudProjector() | |
projector.run() | |
except rospy.ROSInterruptException: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment