Skip to content

Instantly share code, notes, and snippets.

@PierrickKoch
Last active June 8, 2022 09:14
Show Gist options
  • Save PierrickKoch/01181660d3427990db64056df43257a5 to your computer and use it in GitHub Desktop.
Save PierrickKoch/01181660d3427990db64056df43257a5 to your computer and use it in GitHub Desktop.
import rospy
import numpy
from std_msgs.msg import Header
from sensor_msgs.msg import PointCloud2, PointField
def cloud_from_numpy(msg, points):
# use only if points data is packed (no padding for memory alignment)
msg.height = 1
msg.width = len(points)
msg.point_step = points.itemsize
msg.row_step = msg.point_step * msg.width
msg.data = points.tobytes()
return msg
msg = PointCloud2()
msg.header.frame_id = 'map'
msg.fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1)
]
dtype = numpy.dtype([(name, numpy.float32) for name in 'xyzi'])
rospy.init_node('lidar_test_ros_bridge_carla')
lidar_publisher = rospy.Publisher('/lidar', PointCloud2, queue_size=1)
while not rospy.is_shutdown():
random_cloud = numpy.random.random((123, 4)).astype(numpy.float32) * 10 - 5
raw_data = memoryview(random_cloud.tobytes())
lidar_data = numpy.frombuffer(raw_data, dtype).copy()
lidar_data['y'] *= -1
point_cloud_msg = cloud_from_numpy(msg, lidar_data)
lidar_publisher.publish(point_cloud_msg)
rospy.sleep(.2)
import ctypes
import struct
from std_msgs.msg import Header
from sensor_msgs.msg import PointCloud2, PointField
_DATATYPES = {}
_DATATYPES[PointField.INT8] = ('b', 1)
_DATATYPES[PointField.UINT8] = ('B', 1)
_DATATYPES[PointField.INT16] = ('h', 2)
_DATATYPES[PointField.UINT16] = ('H', 2)
_DATATYPES[PointField.INT32] = ('i', 4)
_DATATYPES[PointField.UINT32] = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)
def _get_struct_fmt(is_bigendian, fields, field_names=None):
fmt = '>' if is_bigendian else '<'
offset = 0
for field in (f for f in sorted(fields, key=lambda f: f.offset)
if field_names is None or f.name in field_names):
if offset < field.offset:
fmt += 'x' * (field.offset - offset)
offset = field.offset
if field.datatype not in _DATATYPES:
print('Skipping unknown PointField datatype [{}]' % field.datatype, file=sys.stderr)
else:
datatype_fmt, datatype_length = _DATATYPES[field.datatype]
fmt += field.count * datatype_fmt
offset += field.count * datatype_length
return fmt
def create_cloud(header, fields, points):
"""
Create a L{sensor_msgs.msg.PointCloud2} message.
@param header: The point cloud header.
@type header: L{std_msgs.msg.Header}
@param fields: The point cloud fields.
@type fields: iterable of L{sensor_msgs.msg.PointField}
@param points: The point cloud points.
@type points: list of iterables, i.e. one iterable for each point, with the
elements of each iterable being the values of the fields for
that point (in the same order as the fields parameter)
@return: The point cloud.
@rtype: L{sensor_msgs.msg.PointCloud2}
"""
cloud_struct = struct.Struct(_get_struct_fmt(False, fields))
buff = ctypes.create_string_buffer(cloud_struct.size * len(points))
point_step, pack_into = cloud_struct.size, cloud_struct.pack_into
offset = 0
for p in points:
pack_into(buff, offset, *p)
offset += point_step
return PointCloud2(header=header,
height=1,
width=len(points),
is_dense=False,
is_bigendian=False,
fields=fields,
point_step=cloud_struct.size,
row_step=cloud_struct.size * len(points),
data=buff.raw)
def cloud_from_numpy(msg, points):
# use only if points data is packed (no padding for memory alignment)
msg.height = 1
msg.width = len(points)
msg.point_step = points.itemsize
msg.row_step = msg.point_step * msg.width
msg.data = points.tobytes()
return msg
msg = PointCloud2()
msg.header.frame_id = 'map'
msg.fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1)
]
# import timeit
# printit = lambda code: print('%.3e ms : %s' % (timeit.timeit(code, number=1000, globals=globals()), code))
import time
def printit(code, number=1000):
t0 = time.time()
for n in range(number):
eval(code, globals())
dt = time.time() - t0
print('%.3e ms : %s' % (dt, code))
import numpy
dtype = numpy.dtype([(name, numpy.float32) for name in 'xyzi'])
random_cloud = numpy.random.random((12345, 4)).astype(numpy.float32) * 10 - 5
import sys
# Carla API raw_data is a PyMemoryView_FromMemory from GetRawDataAsBuffer, cf:
# https://github.com/carla-simulator/carla/blob/master/PythonAPI/carla/source/libcarla/SensorData.cpp#L185
if sys.version_info.major >= 3:
raw_data = memoryview(random_cloud.tobytes())
else:
raw_data = buffer(random_cloud.tobytes())
printit("bytes(random_cloud)")
printit("random_cloud.tobytes()")
printit("numpy.frombuffer(raw_data, dtype)") # readonly !
printit("numpy.fromstring(bytes(raw_data), dtype)") # DeprecationWarning !
printit("numpy.frombuffer(raw_data, dtype).copy()")
printit("numpy.copy(numpy.frombuffer(raw_data, dtype))")
lidar_data = numpy.frombuffer(raw_data, dtype).copy()
lidar_data['y'] *= -1
point_cloud_msg = cloud_from_numpy(msg, lidar_data)
point_cloud_msg_old = create_cloud(msg.header, msg.fields, lidar_data)
assert point_cloud_msg == point_cloud_msg_old, "messages missmatch"
printit("cloud_from_numpy(msg, lidar_data)")
printit("create_cloud(msg.header, msg.fields, lidar_data)")
"""
# Output:
6.210e-03 ms : bytes(random_cloud)
5.847e-03 ms : random_cloud.tobytes()
4.093e-04 ms : numpy.frombuffer(raw_data, dtype)
<timeit-src>:6: DeprecationWarning: The binary mode of fromstring is deprecated, as it behaves surprisingly on unicode inputs. Use frombuffer instead
1.478e-02 ms : numpy.fromstring(bytes(raw_data), dtype)
7.445e-03 ms : numpy.frombuffer(raw_data, dtype).copy()
7.847e-03 ms : numpy.copy(numpy.frombuffer(raw_data, dtype))
6.125e-03 ms : cloud_from_numpy(msg, lidar_data)
2.982e+01 ms : create_cloud(msg.header, msg.fields, lidar_data)
# ==> create_cloud speedup = 4868 times faster!
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment