Skip to content

Instantly share code, notes, and snippets.

@jtbandes
Last active February 23, 2023 01:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtbandes/c835d587ff5123bff12ebee2cd81bc56 to your computer and use it in GitHub Desktop.
Save jtbandes/c835d587ff5123bff12ebee2cd81bc56 to your computer and use it in GitHub Desktop.
import colorsys
import base64
import math
import json
import struct
import requests
from tqdm import tqdm
from pyquaternion import Quaternion
from mcap.writer import Writer
scene_update_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/SceneUpdate.json").text
point_cloud_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/PointCloud.json").text
grid_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/Grid.json").text
frame_transform_json = requests.get(
"https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/FrameTransform.json"
).text
pointcloud2_json = json.dumps(
{
"type": "object",
"properties": {
"header": {
"type": "object",
"properties": {
"frame_id": {"type": "string"},
"stamp": {
"type": "object",
"properties": {
"sec": {"type": "integer"},
"nsec": {"type": "integer"},
},
},
},
},
"height": {"type": "integer"},
"width": {"type": "integer"},
"fields": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {"type": "string"},
"offset": {"type": "integer"},
"datatype": {"type": "integer"},
"count": {"type": "integer"},
},
},
},
"is_bigendian": {"type": "boolean"},
"point_step": {"type": "integer"},
"row_step": {"type": "integer"},
"data": {"type": "string", "contentEncoding": "base64"},
"is_dense": {"type": "boolean"},
},
}
)
def make_color(f):
r, g, b = colorsys.hsv_to_rgb(f, 1, 1)
return {"r": r, "g": g, "b": b, "a": f}
with open(f"pointcloud_rgba.mcap", "wb") as stream:
writer = Writer(stream)
# The library argument help identify what tool wrote the file.
writer.start(profile="", library="")
schema1 = writer.register_schema(
name="foxglove.PointCloud",
encoding="jsonschema",
data=point_cloud_json.encode(),
)
ch1 = writer.register_channel(
schema_id=schema1,
topic="cloud",
message_encoding="json",
)
schema2 = writer.register_schema(
name="foxglove.Grid",
encoding="jsonschema",
data=grid_json.encode(),
)
ch2 = writer.register_channel(
schema_id=schema2,
topic="grid",
message_encoding="json",
)
schema3 = writer.register_schema(
name="ros.sensor_msgs.PointCloud2",
encoding="jsonschema",
data=pointcloud2_json.encode(),
)
ch3 = writer.register_channel(
schema_id=schema3,
topic="pc2",
message_encoding="json",
)
schema4 = writer.register_schema(
name="foxglove.FrameTransform",
encoding="jsonschema",
data=frame_transform_json.encode(),
)
ch4 = writer.register_channel(
schema_id=schema4,
topic="transforms",
message_encoding="json",
)
frame_interval_ns = 50_000_000
nframes = 1000
for frame in tqdm(range(0, nframes)):
t = frame * frame_interval_ns
tt = t / 1_000_000_000
timestamp = {"sec": t // 1_000_000_000, "nsec": t % 1_000_000_000}
for i, child in enumerate(["cloud", "grid", "pc2"]):
q = Quaternion(axis=[0, 0, 1], angle=2 * math.pi * i / 3)
frame_transform = {
"timestamp": timestamp,
"parent_frame_id": "root",
"child_frame_id": child,
"translation": {"x": 0, "y": 0, "z": 0},
"rotation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w},
}
writer.add_message(ch4, t, json.dumps(frame_transform).encode(), t)
point_stride = 7 * 4
height = 20
width = 20
def color(x, y):
r = 0.5 + 0.5 * (math.sin(2 * math.pi * tt + x / width))
g = y / height
b = 0.5
a = 0.5 + 0.5 * ((x / width) * (y / height))
return r, g, b, a
offset = 0
data = bytearray(width * height * point_stride)
for x in range(width):
for y in range(height):
r, g, b, a = color(x, y)
struct.pack_into(
"<fffBBBB",
data,
offset,
x + 0.2 * y / height * math.sin(2 * math.pi * (tt + y / height)),
y,
0,
round(r * 255),
round(g * 255),
round(b * 255),
round(a * 255),
)
offset += point_stride
fields = [
{"name": "x", "offset": 0, "type": 7},
{"name": "y", "offset": 4, "type": 7},
{"name": "z", "offset": 8, "type": 7},
{"name": "red", "offset": 12, "type": 1},
{"name": "green", "offset": 13, "type": 1},
{"name": "blue", "offset": 14, "type": 1},
{"name": "alpha", "offset": 15, "type": 1},
]
pointcloud = {
"timestamp": timestamp,
"frame_id": "cloud",
"pose": {"position": {"x": 0, "y": 0, "z": 0}, "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}},
"point_stride": point_stride,
"fields": fields,
"data": base64.b64encode(data).decode(),
}
writer.add_message(ch1, t, json.dumps(pointcloud).encode(), t)
grid = {
"timestamp": timestamp,
"frame_id": "grid",
"pose": {"position": {"x": 0, "y": 0, "z": 0}, "orientation": {"x": 0, "y": 0, "z": 0, "w": 1}},
"column_count": width,
"row_count": height,
"cell_stride": point_stride,
"row_stride": width * point_stride,
"fields": fields,
"data": base64.b64encode(data).decode(),
}
writer.add_message(ch2, t, json.dumps(grid).encode(), t)
pc2_point_stride = 16
pc2_fields = [
{"name": "x", "offset": 0, "datatype": 7, "count": 1},
{"name": "y", "offset": 4, "datatype": 7, "count": 1},
{"name": "z", "offset": 8, "datatype": 7, "count": 1},
{"name": "rgba", "offset": 12, "datatype": 7, "count": 1},
]
pc2_data = bytearray(width * height * pc2_point_stride)
offset = 0
for x in range(width):
for y in range(height):
r, g, b, a = color(x, y)
struct.pack_into(
"<fffBBBB",
pc2_data,
offset,
x + 0.2 * y / height * math.sin(2 * math.pi * (tt + y / height)),
y,
0,
round(b * 255),
round(g * 255),
round(r * 255),
round(a * 255),
)
offset += pc2_point_stride
pc2 = {
"header": {"frame_id": "pc2", "stamp": timestamp},
"height": height,
"width": width,
"fields": pc2_fields,
"is_bigendian": False,
"point_step": pc2_point_stride,
"row_step": width * pc2_point_stride,
"data": base64.b64encode(pc2_data).decode(),
"is_dense": True,
}
writer.add_message(ch3, t, json.dumps(pc2).encode(), t)
writer.finish()
stream.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment