Skip to content

Instantly share code, notes, and snippets.

@SebastianGrans
Created July 27, 2020 16:29
Show Gist options
  • Save SebastianGrans/6ae5cab66e453a14a859b66cd9579239 to your computer and use it in GitHub Desktop.
Save SebastianGrans/6ae5cab66e453a14a859b66cd9579239 to your computer and use it in GitHub Desktop.
ROS2 Node that subscribes to PointCloud2 messages and visualizes them using Open3D.
import sys
import os
import rclpy
from rclpy.node import Node
import sensor_msgs.msg as sensor_msgs
import numpy as np
import open3d as o3d
class PCDListener(Node):
def __init__(self):
super().__init__('pcd_subsriber_node')
## This is for visualization of the received point cloud.
self.vis = o3d.visualization.Visualizer()
self.vis.create_window()
self.o3d_pcd = o3d.geometry.PointCloud()
# Set up a subscription to the 'pcd' topic with a callback to the
# function `listener_callback`
self.pcd_subscriber = self.create_subscription(
sensor_msgs.PointCloud2, # Msg type
'pcd', # topic
self.listener_callback, # Function to call
10 # QoS
)
def listener_callback(self, msg):
# Here we convert the 'msg', which is of the type PointCloud2.
# I ported the function read_points2 from
# the ROS1 package.
# https://github.com/ros/common_msgs/blob/noetic-devel/sensor_msgs/src/sensor_msgs/point_cloud2.py
pcd_as_numpy_array = np.array(list(read_points(msg)))
# The rest here is for visualization.
self.vis.remove_geometry(self.o3d_pcd)
self.o3d_pcd = o3d.geometry.PointCloud(
o3d.utility.Vector3dVector(pcd_as_numpy_array))
self.vis.add_geometry(self.o3d_pcd)
self.vis.poll_events()
self.vis.update_renderer()
## The code below is "ported" from
# https://github.com/ros/common_msgs/tree/noetic-devel/sensor_msgs/src/sensor_msgs
# I'll make an official port and PR to this repo later:
# https://github.com/ros2/common_interfaces
import sys
from collections import namedtuple
import ctypes
import math
import struct
from sensor_msgs.msg import PointCloud2, PointField
_DATATYPES = {}
_DATATYPES[PointField.INT8] = ('b', 1)
_DATATYPES[PointField.UINT8] = ('B', 1)
_DATATYPES[PointField.INT16] = ('h', 2)
_DATATYPES[PointField.UINT16] = ('H', 2)
_DATATYPES[PointField.INT32] = ('i', 4)
_DATATYPES[PointField.UINT32] = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
"""
Read points from a L{sensor_msgs.PointCloud2} message.
@param cloud: The point cloud to read from.
@type cloud: L{sensor_msgs.PointCloud2}
@param field_names: The names of fields to read. If None, read all fields. [default: None]
@type field_names: iterable
@param skip_nans: If True, then don't return any point with a NaN value.
@type skip_nans: bool [default: False]
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
@type uvs: iterable
@return: Generator which yields a list of values for each point.
@rtype: generator
"""
assert isinstance(cloud, PointCloud2), 'cloud is not a sensor_msgs.msg.PointCloud2'
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.row_step, cloud.data, math.isnan
unpack_from = struct.Struct(fmt).unpack_from
if skip_nans:
if uvs:
for u, v in uvs:
p = unpack_from(data, (row_step * v) + (point_step * u))
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
else:
for v in range(height):
offset = row_step * v
for u in range(width):
p = unpack_from(data, offset)
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
offset += point_step
else:
if uvs:
for u, v in uvs:
yield unpack_from(data, (row_step * v) + (point_step * u))
else:
for v in range(height):
offset = row_step * v
for u in range(width):
yield unpack_from(data, offset)
offset += point_step
def _get_struct_fmt(is_bigendian, fields, field_names=None):
fmt = '>' if is_bigendian else '<'
offset = 0
for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names):
if offset < field.offset:
fmt += 'x' * (field.offset - offset)
offset = field.offset
if field.datatype not in _DATATYPES:
print('Skipping unknown PointField datatype [%d]' % field.datatype, file=sys.stderr)
else:
datatype_fmt, datatype_length = _DATATYPES[field.datatype]
fmt += field.count * datatype_fmt
offset += field.count * datatype_length
return fmt
def main(args=None):
# Boilerplate code.
rclpy.init(args=args)
pcd_listener = PCDListener()
rclpy.spin(pcd_listener)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
pcd_listener.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
@MeyerBuaharon
Copy link

Hey! thanks for sharing your code here!
Do you think you can help me with something similar to this code right here?

I want to generate an ROS file from existing pointcloud, I use rosbag to create the ros file, but I'm not familiar with PointCloud2.
So my question is how can I create a PointCloud2 instance with demo data? only if you know and have time to help, would greatly appreciate it.

@SebastianGrans
Copy link
Author

Hi!
I haven't worked with ROS in a while, but maybe my demo package can help you figure out how to use PointCloud2?

You can find it here: https://github.com/SebastianGrans/ROS2-Point-Cloud-Demo

@MeyerBuaharon
Copy link

Wow, it's very detailed and well made, thank you!!
it really helped me.

@dasiths
Copy link

dasiths commented Oct 24, 2022

I've got a very big point cloud and the o3d.utility.Vector3dVector line is throwing a runtime error without any description when using your sample above.

The following code works though. Sample from https://answers.ros.org/question/255351/how-o-save-a-pointcloud2-data-in-python/

        gen = read_points(msg, skip_nans=True)
        int_data = list(gen)

        xyz = np.empty((len(int_data), 3))
        rgb = np.empty((len(int_data), 3))
        idx = 0
        for x in int_data:
            test = x[3] 
            # cast float32 to int so that bitwise operations are possible
            s = struct.pack('>f' ,test)
            i = struct.unpack('>l',s)[0]
            # you can get back the float value by the inverse operations
            pack = ctypes.c_uint32(i).value
            r = (pack & 0x00FF0000)>> 16
            g = (pack & 0x0000FF00)>> 8
            b = (pack & 0x000000FF)
            # prints r,g,b values in the 0-255 range
                        # x,y,z can be retrieved from the x[0],x[1],x[2]
            # xyz = np.append(xyz,[[x[0],x[1],x[2]]], axis = 0)
            # rgb = np.append(rgb,[[r,g,b]], axis = 0)
            xyz[idx] = [x[0],x[1],x[2]]
            rgb[idx] = [r,g,b]
            idx = idx + 1

        out_pcd = o3d.geometry.PointCloud()    
        out_pcd.points = o3d.utility.Vector3dVector(xyz)
        out_pcd.colors = o3d.utility.Vector3dVector(rgb)

Any idea why the numpy_array returned from your code sample would not work?

@BryanBetancur
Copy link

BryanBetancur commented Nov 1, 2022

I solved the error changing this line in read_points()
yield unpack_from(data, offset)[:3]
It because this line was returning lists of 4 positions and o3d.utility.Vector3dVector works with lists of 3 positions.
Note: I don't know for what the 4 position value is for. Maybe it is the point color packed in float.

@mkarklins
Copy link

Thanks for the solution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment