-
-
Save jtbandes/c835d587ff5123bff12ebee2cd81bc56 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import colorsys | |
| import base64 | |
| import math | |
| import json | |
| import struct | |
| import requests | |
| from tqdm import tqdm | |
| from pyquaternion import Quaternion | |
| from mcap.writer import Writer | |
| scene_update_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/SceneUpdate.json").text | |
| point_cloud_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/PointCloud.json").text | |
| grid_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/Grid.json").text | |
| frame_transform_json = requests.get( | |
| "https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/FrameTransform.json" | |
| ).text | |
| pointcloud2_json = json.dumps( | |
| { | |
| "type": "object", | |
| "properties": { | |
| "header": { | |
| "type": "object", | |
| "properties": { | |
| "frame_id": {"type": "string"}, | |
| "stamp": { | |
| "type": "object", | |
| "properties": { | |
| "sec": {"type": "integer"}, | |
| "nsec": {"type": "integer"}, | |
| }, | |
| }, | |
| }, | |
| }, | |
| "height": {"type": "integer"}, | |
| "width": {"type": "integer"}, | |
| "fields": { | |
| "type": "array", | |
| "items": { | |
| "type": "object", | |
| "properties": { | |
| "name": {"type": "string"}, | |
| "offset": {"type": "integer"}, | |
| "datatype": {"type": "integer"}, | |
| "count": {"type": "integer"}, | |
| }, | |
| }, | |
| }, | |
| "is_bigendian": {"type": "boolean"}, | |
| "point_step": {"type": "integer"}, | |
| "row_step": {"type": "integer"}, | |
| "data": {"type": "string", "contentEncoding": "base64"}, | |
| "is_dense": {"type": "boolean"}, | |
| }, | |
| } | |
| ) | |
| def make_color(f): | |
| r, g, b = colorsys.hsv_to_rgb(f, 1, 1) | |
| return {"r": r, "g": g, "b": b, "a": f} | |
| with open(f"pointcloud_rgba.mcap", "wb") as stream: | |
| writer = Writer(stream) | |
| # The library argument help identify what tool wrote the file. | |
| writer.start(profile="", library="") | |
| schema1 = writer.register_schema( | |
| name="foxglove.PointCloud", | |
| encoding="jsonschema", | |
| data=point_cloud_json.encode(), | |
| ) | |
| ch1 = writer.register_channel( | |
| schema_id=schema1, | |
| topic="cloud", | |
| message_encoding="json", | |
| ) | |
| schema2 = writer.register_schema( | |
| name="foxglove.Grid", | |
| encoding="jsonschema", | |
| data=grid_json.encode(), | |
| ) | |
| ch2 = writer.register_channel( | |
| schema_id=schema2, | |
| topic="grid", | |
| message_encoding="json", | |
| ) | |
| schema3 = writer.register_schema( | |
| name="ros.sensor_msgs.PointCloud2", | |
| encoding="jsonschema", | |
| data=pointcloud2_json.encode(), | |
| ) | |
| ch3 = writer.register_channel( | |
| schema_id=schema3, | |
| topic="pc2", | |
| message_encoding="json", | |
| ) | |
| schema4 = writer.register_schema( | |
| name="foxglove.FrameTransform", | |
| encoding="jsonschema", | |
| data=frame_transform_json.encode(), | |
| ) | |
| ch4 = writer.register_channel( | |
| schema_id=schema4, | |
| topic="transforms", | |
| message_encoding="json", | |
| ) | |
| frame_interval_ns = 50_000_000 | |
| nframes = 1000 | |
| for frame in tqdm(range(0, nframes)): | |
| t = frame * frame_interval_ns | |
| tt = t / 1_000_000_000 | |
| timestamp = {"sec": t // 1_000_000_000, "nsec": t % 1_000_000_000} | |
| for i, child in enumerate(["cloud", "grid", "pc2"]): | |
| q = Quaternion(axis=[0, 0, 1], angle=2 * math.pi * i / 3) | |
| frame_transform = { | |
| "timestamp": timestamp, | |
| "parent_frame_id": "root", | |
| "child_frame_id": child, | |
| "translation": {"x": 0, "y": 0, "z": 0}, | |
| "rotation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w}, | |
| } | |
| writer.add_message(ch4, t, json.dumps(frame_transform).encode(), t) | |
| point_stride = 7 * 4 | |
| height = 20 | |
| width = 20 | |
| def color(x, y): | |
| r = 0.5 + 0.5 * (math.sin(2 * math.pi * tt + x / width)) | |
| g = y / height | |
| b = 0.5 | |
| a = 0.5 + 0.5 * ((x / width) * (y / height)) | |
| return r, g, b, a | |
| offset = 0 | |
| data = bytearray(width * height * point_stride) | |
| for x in range(width): | |
| for y in range(height): | |
| r, g, b, a = color(x, y) | |
| struct.pack_into( | |
| "<fffBBBB", | |
| data, | |
| offset, | |
| x + 0.2 * y / height * math.sin(2 * math.pi * (tt + y / height)), | |
| y, | |
| 0, | |
| round(r * 255), | |
| round(g * 255), | |
| round(b * 255), | |
| round(a * 255), | |
| ) | |
| offset += point_stride | |
| fields = [ | |
| {"name": "x", "offset": 0, "type": 7}, | |
| {"name": "y", "offset": 4, "type": 7}, | |
| {"name": "z", "offset": 8, "type": 7}, | |
| {"name": "red", "offset": 12, "type": 1}, | |
| {"name": "green", "offset": 13, "type": 1}, | |
| {"name": "blue", "offset": 14, "type": 1}, | |
| {"name": "alpha", "offset": 15, "type": 1}, | |
| ] | |
| pointcloud = { | |
| "timestamp": timestamp, | |
| "frame_id": "cloud", | |
| "pose": {"position": {"x": 0, "y": 0, "z": 0}, "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}}, | |
| "point_stride": point_stride, | |
| "fields": fields, | |
| "data": base64.b64encode(data).decode(), | |
| } | |
| writer.add_message(ch1, t, json.dumps(pointcloud).encode(), t) | |
| grid = { | |
| "timestamp": timestamp, | |
| "frame_id": "grid", | |
| "pose": {"position": {"x": 0, "y": 0, "z": 0}, "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}}, | |
| "column_count": width, | |
| "row_count": height, | |
| "cell_stride": point_stride, | |
| "row_stride": width * point_stride, | |
| "fields": fields, | |
| "data": base64.b64encode(data).decode(), | |
| } | |
| writer.add_message(ch2, t, json.dumps(grid).encode(), t) | |
| pc2_point_stride = 16 | |
| pc2_fields = [ | |
| {"name": "x", "offset": 0, "datatype": 7, "count": 1}, | |
| {"name": "y", "offset": 4, "datatype": 7, "count": 1}, | |
| {"name": "z", "offset": 8, "datatype": 7, "count": 1}, | |
| {"name": "rgba", "offset": 12, "datatype": 7, "count": 1}, | |
| ] | |
| pc2_data = bytearray(width * height * pc2_point_stride) | |
| offset = 0 | |
| for x in range(width): | |
| for y in range(height): | |
| r, g, b, a = color(x, y) | |
| struct.pack_into( | |
| "<fffBBBB", | |
| pc2_data, | |
| offset, | |
| x + 0.2 * y / height * math.sin(2 * math.pi * (tt + y / height)), | |
| y, | |
| 0, | |
| round(b * 255), | |
| round(g * 255), | |
| round(r * 255), | |
| round(a * 255), | |
| ) | |
| offset += pc2_point_stride | |
| pc2 = { | |
| "header": {"frame_id": "pc2", "stamp": timestamp}, | |
| "height": height, | |
| "width": width, | |
| "fields": pc2_fields, | |
| "is_bigendian": False, | |
| "point_step": pc2_point_stride, | |
| "row_step": width * pc2_point_stride, | |
| "data": base64.b64encode(pc2_data).decode(), | |
| "is_dense": True, | |
| } | |
| writer.add_message(ch3, t, json.dumps(pc2).encode(), t) | |
| writer.finish() | |
| stream.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment