Skip to content

Instantly share code, notes, and snippets.

@audrow
Last active January 3, 2023 06:41
Show Gist options
  • Save audrow/0305bc6c83fca780f77acdb227f9a626 to your computer and use it in GitHub Desktop.
Save audrow/0305bc6c83fca780f77acdb227f9a626 to your computer and use it in GitHub Desktop.
ROS2 PointCloud2 Publisher #open-robotics
# A way of reproducing https://github.com/ros2/rviz/issues/322
#
# Code source:
# https://github.com/SebastianGrans/ROS2-Point-Cloud-Demo/blob/79ef97fc92147a0d550543a69541139097cc3b35/pcd_demo/pcd_publisher/pcd_publisher_node.py
import rclpy
from rclpy.node import Node
import sensor_msgs.msg as sensor_msgs
import std_msgs.msg as std_msgs
import numpy as np
class PCDPublisher(Node):
def __init__(self):
super().__init__('pcd_publisher_node')
self.points = np.array([[1, 1, 1]])
print(self.points.shape)
self.pcd_publisher = self.create_publisher(sensor_msgs.PointCloud2, 'pcd', 10)
timer_period = 1/ 30.0
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
self.pcd = point_cloud(self.points, 'map')
self.pcd_publisher.publish(self.pcd)
def point_cloud(points, parent_frame):
""" Creates a point cloud message.
Args:
points: Nx3 array of xyz positions.
parent_frame: frame in which the point cloud is defined
Returns:
sensor_msgs/PointCloud2 message
Code source:
https://gist.github.com/pgorczak/5c717baa44479fa064eb8d33ea4587e0
"""
ros_dtype = sensor_msgs.PointField.FLOAT32
dtype = np.float32
itemsize = np.dtype(dtype).itemsize # A 32-bit float takes 4 bytes.
data = points.astype(dtype).tobytes()
fields = [sensor_msgs.PointField(
name=n, offset=i * itemsize, datatype=ros_dtype, count=1)
for i, n in enumerate('xyz')]
header = std_msgs.Header(frame_id=parent_frame)
return sensor_msgs.PointCloud2(
header=header,
height=1,
width=points.shape[0],
is_dense=False,
is_bigendian=False,
fields=fields,
point_step=(itemsize * 3), # Every point consists of three float32s.
row_step=(itemsize * 3 * points.shape[0]),
data=data
)
def main(args=None):
rclpy.init(args=args)
pcd_publisher = PCDPublisher()
rclpy.spin(pcd_publisher)
if __name__ == '__main__':
main()
@audrow
Copy link
Author

audrow commented Aug 21, 2020

Used to reproduce ros2/rviz#322.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment