-
-
Save SebastianGrans/6ae5cab66e453a14a859b66cd9579239 to your computer and use it in GitHub Desktop.
import sys | |
import os | |
import rclpy | |
from rclpy.node import Node | |
import sensor_msgs.msg as sensor_msgs | |
import numpy as np | |
import open3d as o3d | |
class PCDListener(Node): | |
def __init__(self): | |
super().__init__('pcd_subsriber_node') | |
## This is for visualization of the received point cloud. | |
self.vis = o3d.visualization.Visualizer() | |
self.vis.create_window() | |
self.o3d_pcd = o3d.geometry.PointCloud() | |
# Set up a subscription to the 'pcd' topic with a callback to the | |
# function `listener_callback` | |
self.pcd_subscriber = self.create_subscription( | |
sensor_msgs.PointCloud2, # Msg type | |
'pcd', # topic | |
self.listener_callback, # Function to call | |
10 # QoS | |
) | |
def listener_callback(self, msg): | |
# Here we convert the 'msg', which is of the type PointCloud2. | |
# I ported the function read_points2 from | |
# the ROS1 package. | |
# https://github.com/ros/common_msgs/blob/noetic-devel/sensor_msgs/src/sensor_msgs/point_cloud2.py | |
pcd_as_numpy_array = np.array(list(read_points(msg))) | |
# The rest here is for visualization. | |
self.vis.remove_geometry(self.o3d_pcd) | |
self.o3d_pcd = o3d.geometry.PointCloud( | |
o3d.utility.Vector3dVector(pcd_as_numpy_array)) | |
self.vis.add_geometry(self.o3d_pcd) | |
self.vis.poll_events() | |
self.vis.update_renderer() | |
## The code below is "ported" from | |
# https://github.com/ros/common_msgs/tree/noetic-devel/sensor_msgs/src/sensor_msgs | |
# I'll make an official port and PR to this repo later: | |
# https://github.com/ros2/common_interfaces | |
import sys | |
from collections import namedtuple | |
import ctypes | |
import math | |
import struct | |
from sensor_msgs.msg import PointCloud2, PointField | |
_DATATYPES = {} | |
_DATATYPES[PointField.INT8] = ('b', 1) | |
_DATATYPES[PointField.UINT8] = ('B', 1) | |
_DATATYPES[PointField.INT16] = ('h', 2) | |
_DATATYPES[PointField.UINT16] = ('H', 2) | |
_DATATYPES[PointField.INT32] = ('i', 4) | |
_DATATYPES[PointField.UINT32] = ('I', 4) | |
_DATATYPES[PointField.FLOAT32] = ('f', 4) | |
_DATATYPES[PointField.FLOAT64] = ('d', 8) | |
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]): | |
""" | |
Read points from a L{sensor_msgs.PointCloud2} message. | |
@param cloud: The point cloud to read from. | |
@type cloud: L{sensor_msgs.PointCloud2} | |
@param field_names: The names of fields to read. If None, read all fields. [default: None] | |
@type field_names: iterable | |
@param skip_nans: If True, then don't return any point with a NaN value. | |
@type skip_nans: bool [default: False] | |
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list] | |
@type uvs: iterable | |
@return: Generator which yields a list of values for each point. | |
@rtype: generator | |
""" | |
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2' | |
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names) | |
width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan | |
unpack_from = struct.Struct(fmt).unpack_from | |
if skip_nans: | |
if uvs: | |
for u, v in uvs: | |
p = unpack_from(data, (row_step * v) + (point_step * u)) | |
has_nan = False | |
for pv in p: | |
if isnan(pv): | |
has_nan = True | |
break | |
if not has_nan: | |
yield p | |
else: | |
for v in range(height): | |
offset = row_step * v | |
for u in range(width): | |
p = unpack_from(data, offset) | |
has_nan = False | |
for pv in p: | |
if isnan(pv): | |
has_nan = True | |
break | |
if not has_nan: | |
yield p | |
offset += point_step | |
else: | |
if uvs: | |
for u, v in uvs: | |
yield unpack_from(data, (row_step * v) + (point_step * u)) | |
else: | |
for v in range(height): | |
offset = row_step * v | |
for u in range(width): | |
yield unpack_from(data, offset) | |
offset += point_step | |
def _get_struct_fmt(is_bigendian, fields, field_names=None): | |
fmt = '>' if is_bigendian else '<' | |
offset = 0 | |
for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names): | |
if offset < field.offset: | |
fmt += 'x' * (field.offset - offset) | |
offset = field.offset | |
if field.datatype not in _DATATYPES: | |
print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr) | |
else: | |
datatype_fmt, datatype_length = _DATATYPES[field.datatype] | |
fmt += field.count * datatype_fmt | |
offset += field.count * datatype_length | |
return fmt | |
def main(args=None): | |
# Boilerplate code. | |
rclpy.init(args=args) | |
pcd_listener = PCDListener() | |
rclpy.spin(pcd_listener) | |
# Destroy the node explicitly | |
# (optional - otherwise it will be done automatically | |
# when the garbage collector destroys the node object) | |
pcd_listener.destroy_node() | |
rclpy.shutdown() | |
if __name__ == '__main__': | |
main() |
Wow, it's very detailed and well made, thank you!!
it really helped me.
I've got a very big point cloud and the o3d.utility.Vector3dVector
line is throwing a runtime error without any description when using your sample above.
The following code works though. Sample from https://answers.ros.org/question/255351/how-o-save-a-pointcloud2-data-in-python/
gen = read_points(msg, skip_nans=True)
int_data = list(gen)
xyz = np.empty((len(int_data), 3))
rgb = np.empty((len(int_data), 3))
idx = 0
for x in int_data:
test = x[3]
# cast float32 to int so that bitwise operations are possible
s = struct.pack('>f' ,test)
i = struct.unpack('>l',s)[0]
# you can get back the float value by the inverse operations
pack = ctypes.c_uint32(i).value
r = (pack & 0x00FF0000)>> 16
g = (pack & 0x0000FF00)>> 8
b = (pack & 0x000000FF)
# prints r,g,b values in the 0-255 range
# x,y,z can be retrieved from the x[0],x[1],x[2]
# xyz = np.append(xyz,[[x[0],x[1],x[2]]], axis = 0)
# rgb = np.append(rgb,[[r,g,b]], axis = 0)
xyz[idx] = [x[0],x[1],x[2]]
rgb[idx] = [r,g,b]
idx = idx + 1
out_pcd = o3d.geometry.PointCloud()
out_pcd.points = o3d.utility.Vector3dVector(xyz)
out_pcd.colors = o3d.utility.Vector3dVector(rgb)
Any idea why the numpy_array returned from your code sample would not work?
I solved the error changing this line in read_points()
yield unpack_from(data, offset)[:3]
It because this line was returning lists of 4 positions and o3d.utility.Vector3dVector
works with lists of 3 positions.
Note: I don't know for what the 4 position value is for. Maybe it is the point color packed in float.
Thanks for the solution!
Hi!
I haven't worked with ROS in a while, but maybe my demo package can help you figure out how to use PointCloud2?
You can find it here: https://github.com/SebastianGrans/ROS2-Point-Cloud-Demo