This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import colorsys | |
import base64 | |
import math | |
import json | |
import struct | |
import requests | |
from tqdm import tqdm | |
from pyquaternion import Quaternion | |
from mcap.writer import Writer | |
scene_update_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/SceneUpdate.json").text | |
point_cloud_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/PointCloud.json").text | |
grid_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/Grid.json").text | |
frame_transform_json = requests.get( | |
"https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/FrameTransform.json" | |
).text | |
pointcloud2_json = json.dumps( | |
{ | |
"type": "object", | |
"properties": { | |
"header": { | |
"type": "object", | |
"properties": { | |
"frame_id": {"type": "string"}, | |
"stamp": { | |
"type": "object", | |
"properties": { | |
"sec": {"type": "integer"}, | |
"nsec": {"type": "integer"}, | |
}, | |
}, | |
}, | |
}, | |
"height": {"type": "integer"}, | |
"width": {"type": "integer"}, | |
"fields": { | |
"type": "array", | |
"items": { | |
"type": "object", | |
"properties": { | |
"name": {"type": "string"}, | |
"offset": {"type": "integer"}, | |
"datatype": {"type": "integer"}, | |
"count": {"type": "integer"}, | |
}, | |
}, | |
}, | |
"is_bigendian": {"type": "boolean"}, | |
"point_step": {"type": "integer"}, | |
"row_step": {"type": "integer"}, | |
"data": {"type": "string", "contentEncoding": "base64"}, | |
"is_dense": {"type": "boolean"}, | |
}, | |
} | |
) | |
def make_color(f): | |
r, g, b = colorsys.hsv_to_rgb(f, 1, 1) | |
return {"r": r, "g": g, "b": b, "a": f} | |
with open(f"pointcloud_rgba.mcap", "wb") as stream: | |
writer = Writer(stream) | |
# The library argument help identify what tool wrote the file. | |
writer.start(profile="", library="") | |
schema1 = writer.register_schema( | |
name="foxglove.PointCloud", | |
encoding="jsonschema", | |
data=point_cloud_json.encode(), | |
) | |
ch1 = writer.register_channel( | |
schema_id=schema1, | |
topic="cloud", | |
message_encoding="json", | |
) | |
schema2 = writer.register_schema( | |
name="foxglove.Grid", | |
encoding="jsonschema", | |
data=grid_json.encode(), | |
) | |
ch2 = writer.register_channel( | |
schema_id=schema2, | |
topic="grid", | |
message_encoding="json", | |
) | |
schema3 = writer.register_schema( | |
name="ros.sensor_msgs.PointCloud2", | |
encoding="jsonschema", | |
data=pointcloud2_json.encode(), | |
) | |
ch3 = writer.register_channel( | |
schema_id=schema3, | |
topic="pc2", | |
message_encoding="json", | |
) | |
schema4 = writer.register_schema( | |
name="foxglove.FrameTransform", | |
encoding="jsonschema", | |
data=frame_transform_json.encode(), | |
) | |
ch4 = writer.register_channel( | |
schema_id=schema4, | |
topic="transforms", | |
message_encoding="json", | |
) | |
frame_interval_ns = 50_000_000 | |
nframes = 1000 | |
for frame in tqdm(range(0, nframes)): | |
t = frame * frame_interval_ns | |
tt = t / 1_000_000_000 | |
timestamp = {"sec": t // 1_000_000_000, "nsec": t % 1_000_000_000} | |
for i, child in enumerate(["cloud", "grid", "pc2"]): | |
q = Quaternion(axis=[0, 0, 1], angle=2 * math.pi * i / 3) | |
frame_transform = { | |
"timestamp": timestamp, | |
"parent_frame_id": "root", | |
"child_frame_id": child, | |
"translation": {"x": 0, "y": 0, "z": 0}, | |
"rotation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w}, | |
} | |
writer.add_message(ch4, t, json.dumps(frame_transform).encode(), t) | |
point_stride = 7 * 4 | |
height = 20 | |
width = 20 | |
def color(x, y): | |
r = 0.5 + 0.5 * (math.sin(2 * math.pi * tt + x / width)) | |
g = y / height | |
b = 0.5 | |
a = 0.5 + 0.5 * ((x / width) * (y / height)) | |
return r, g, b, a | |
offset = 0 | |
data = bytearray(width * height * point_stride) | |
for x in range(width): | |
for y in range(height): | |
r, g, b, a = color(x, y) | |
struct.pack_into( | |
"<fffBBBB", | |
data, | |
offset, | |
x + 0.2 * y / height * math.sin(2 * math.pi * (tt + y / height)), | |
y, | |
0, | |
round(r * 255), | |
round(g * 255), | |
round(b * 255), | |
round(a * 255), | |
) | |
offset += point_stride | |
fields = [ | |
{"name": "x", "offset": 0, "type": 7}, | |
{"name": "y", "offset": 4, "type": 7}, | |
{"name": "z", "offset": 8, "type": 7}, | |
{"name": "red", "offset": 12, "type": 1}, | |
{"name": "green", "offset": 13, "type": 1}, | |
{"name": "blue", "offset": 14, "type": 1}, | |
{"name": "alpha", "offset": 15, "type": 1}, | |
] | |
pointcloud = { | |
"timestamp": timestamp, | |
"frame_id": "cloud", | |
"pose": {"position": {"x": 0, "y": 0, "z": 0}, "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}}, | |
"point_stride": point_stride, | |
"fields": fields, | |
"data": base64.b64encode(data).decode(), | |
} | |
writer.add_message(ch1, t, json.dumps(pointcloud).encode(), t) | |
grid = { | |
"timestamp": timestamp, | |
"frame_id": "grid", | |
"pose": {"position": {"x": 0, "y": 0, "z": 0}, "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}}, | |
"column_count": width, | |
"row_count": height, | |
"cell_stride": point_stride, | |
"row_stride": width * point_stride, | |
"fields": fields, | |
"data": base64.b64encode(data).decode(), | |
} | |
writer.add_message(ch2, t, json.dumps(grid).encode(), t) | |
pc2_point_stride = 16 | |
pc2_fields = [ | |
{"name": "x", "offset": 0, "datatype": 7, "count": 1}, | |
{"name": "y", "offset": 4, "datatype": 7, "count": 1}, | |
{"name": "z", "offset": 8, "datatype": 7, "count": 1}, | |
{"name": "rgba", "offset": 12, "datatype": 7, "count": 1}, | |
] | |
pc2_data = bytearray(width * height * pc2_point_stride) | |
offset = 0 | |
for x in range(width): | |
for y in range(height): | |
r, g, b, a = color(x, y) | |
struct.pack_into( | |
"<fffBBBB", | |
pc2_data, | |
offset, | |
x + 0.2 * y / height * math.sin(2 * math.pi * (tt + y / height)), | |
y, | |
0, | |
round(b * 255), | |
round(g * 255), | |
round(r * 255), | |
round(a * 255), | |
) | |
offset += pc2_point_stride | |
pc2 = { | |
"header": {"frame_id": "pc2", "stamp": timestamp}, | |
"height": height, | |
"width": width, | |
"fields": pc2_fields, | |
"is_bigendian": False, | |
"point_step": pc2_point_stride, | |
"row_step": width * pc2_point_stride, | |
"data": base64.b64encode(pc2_data).decode(), | |
"is_dense": True, | |
} | |
writer.add_message(ch3, t, json.dumps(pc2).encode(), t) | |
writer.finish() | |
stream.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment