Skip to content

Instantly share code, notes, and snippets.

@jtbandes
Last active September 9, 2022 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jtbandes/b1c5f674bea74b2365d512e66f2e557d to your computer and use it in GitHub Desktop.
Save jtbandes/b1c5f674bea74b2365d512e66f2e557d to your computer and use it in GitHub Desktop.
import colorsys
import base64
import math
import json
import requests
from tqdm import tqdm
from time import time_ns
from pyquaternion import Quaternion
from mcap.mcap0.writer import Writer
scene_update_json = requests.get("https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/SceneUpdate.json").text
frame_transform_json = requests.get(
"https://github.com/foxglove/schemas/raw/94898bfc24f9629b42233497d386181fe5afb92b/schemas/jsonschema/FrameTransform.json"
).text
with open("string_enum.mcap", "wb") as stream:
writer = Writer(stream)
writer.start()
schema = writer.register_schema(name="str", encoding="jsonschema", data=json.dumps({"type": "object", "properties": {"data": {"type": "string"}}}).encode())
channel = writer.register_channel(schema_id=schema, topic="values", message_encoding="json")
values = ["l", "a", "l", "a"]
for i, x in enumerate(values):
for j in range(5):
t = i * 5 + j
writer.add_message(channel, t * 10_000_000, json.dumps({"data": x}).encode(), t * 10_000_000)
writer.finish()
primitive_type = "texts"
avocado_data = requests.get("https://assets.foxglove.dev/Avocado.glb").content
models = [
("https://assets.foxglove.dev/NuScenes_car_uncompressed.glb", 0.5),
# ((base64.b64encode(avocado_data).decode("ascii"), "model/gltf-binary"), 40),
("https://assets.foxglove.dev/Avocado.glb", 40),
]
def make_color(f):
r, g, b = colorsys.hsv_to_rgb(f, 1, 1)
return {"r": r, "g": g, "b": b, "a": f}
def model_info(url_or_data):
if isinstance(url_or_data, str):
return {"url": url_or_data}
else:
return {
"data": url_or_data[0],
"media_type": url_or_data[1],
}
with open(f"{primitive_type}.mcap", "wb") as stream:
writer = Writer(stream)
# The library argument help identify what tool wrote the file.
writer.start(profile="", library="")
schema1 = writer.register_schema(
name="foxglove.SceneUpdate",
encoding="jsonschema",
data=scene_update_json.encode(),
)
ch1 = writer.register_channel(
schema_id=schema1,
topic="scene updates",
message_encoding="json",
)
ch3 = writer.register_channel(
schema_id=schema1,
topic="scene updates 2",
message_encoding="json",
)
schema2 = writer.register_schema(
name="foxglove.FrameTransform",
encoding="jsonschema",
data=frame_transform_json.encode(),
)
ch2 = writer.register_channel(
schema_id=schema2,
topic="transforms",
message_encoding="json",
)
frame_interval_ns = 50_000_000
nframes = 1000
nshapes = 20
for frame in tqdm(range(0, nframes)):
t = frame * frame_interval_ns
def make_primitive(i):
r, g, b = colorsys.hsv_to_rgb(i / nshapes + frame / nframes, 1, 1)
a = min(1, i / nshapes + 2 * frame / nframes)
axis = [1, 0, 0]
if primitive_type == "arrows":
axis = [0, 1, 0]
q = Quaternion(axis=axis, angle=2 * math.pi * (i / nshapes + frame / nframes))
pose = {
"position": {"x": i, "y": math.sin(i * 2 * math.pi * frame / nframes), "z": 0},
"orientation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w},
}
if primitive_type == "cubes" or primitive_type == "spheres":
return {
"pose": pose,
"size": {"x": 1, "y": 0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes), "z": 1},
"color": {"r": r, "g": g, "b": b, "a": a},
}
elif primitive_type == "models":
model = models[i % len(models)]
return {
"pose": pose,
"scale": {"x": 1 * model[1], "y": (0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes)) * model[1], "z": 1 * model[1]},
"color": {"r": r, "g": g, "b": b, "a": a},
**model_info(model[0]),
}
elif primitive_type == "lines":
scale_invariant = i % 2 == 0
return {
"type": i % 3,
"pose": pose,
"thickness": (0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes)) * (10 if scale_invariant else 0.5),
"scale_invariant": scale_invariant,
"points": [
{"x": math.cos(2 * math.pi * (j / 10 + frame / nframes)), "y": math.sin(2 * math.pi * (j / 10 + 1.5 * frame / nframes)), "z": j / 10}
for j in range(10)
],
"color": {"r": r, "g": g, "b": b, "a": a},
"colors": [make_color(j / 10) for j in range(10)] if i % 3 == 0 else [],
"indices": [],
}
elif primitive_type == "arrows":
return {
"pose": pose,
"shaft_length": 0.6 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"shaft_diameter": 0.1 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"head_length": 0.4 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"head_diameter": 0.3 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"color": {"r": r, "g": g, "b": b, "a": a},
}
elif primitive_type == "cylinders":
return {
"pose": pose,
"size": {"x": 1, "y": 0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes), "z": 1},
"color": {"r": r, "g": g, "b": b, "a": a},
"top_scale": 0.25 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes),
"bottom_scale": 0.25 + 0.25 * math.cos(i * 2 * math.pi * frame / nframes),
}
elif primitive_type == "texts":
scale_invariant = i % 5 == 0
return {
"pose": pose,
"color": {"r": r, "g": g, "b": b, "a": a},
"font_size": (0.25 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes)) * (20 if scale_invariant else 1),
"billboard": i % 2 == 0 or scale_invariant,
"scale_invariant": scale_invariant,
"text": f"{i}",
}
timestamp = {"sec": t // 1_000_000_000, "nsec": t % 1_000_000_000}
nshapes_visible = int(nshapes * (1 + math.sin(2 * math.pi * 4 * frame / nframes)) / 2)
def make_primitive2(x, y, w, h):
axis = [1, 0, 0]
if primitive_type == "arrows":
axis = [0, 1, 0]
q = Quaternion(axis=axis, angle=2 * math.pi * (x / 5 + y / 5 + 0.5 * t / 1_000_000_000 * x / w))
pose = {
"position": {"x": x / w * 5 - 5, "y": y / h * 5 - 5, "z": 0},
"orientation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w},
}
if primitive_type == "cubes" or primitive_type == "spheres":
return {
"pose": pose,
"size": {"x": 3 / w, "y": 3 / h, "z": 0.05},
"color": {"r": x / w, "g": x / w, "b": 0.2 + x / w * 0.8, "a": y / h},
}
elif primitive_type == "models":
model = models[0]
return {
"pose": pose,
"scale": {"x": (3 / w) * model[1], "y": (3 / h) * model[1], "z": 0.05 * model[1]},
"color": {"r": x / w, "g": x / w, "b": 0.2 + x / w * 0.8, "a": y / h},
**model_info(model[0]),
}
elif primitive_type == "lines":
scale_invariant = i % 2 == 0
return {
"type": i % 3,
"pose": pose,
"thickness": (0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes)) * (10 if scale_invariant else 0.5),
"scale_invariant": scale_invariant,
"points": [
{
"x": 3 / w * math.cos(2 * math.pi * (j / 10 + frame / nframes)),
"y": 3 / h * math.sin(2 * math.pi * (j / 10 + 1.5 * frame / nframes)),
"z": 0.05 * j / 10,
}
for j in range(10)
],
"color": {"r": x / w, "g": x / w, "b": 0.2 + x / w * 0.8, "a": y / h},
"colors": [make_color(j / 10) for j in range(10)] if i % 3 == 0 else [],
"indices": [],
}
elif primitive_type == "arrows":
return {
"pose": pose,
"shaft_length": 0.5 * 3 / w,
"shaft_diameter": 0.3 * 3 / w,
"head_length": 0.5 * 3 / w,
"head_diameter": 0.5 * 3 / w,
"color": {"r": x / w, "g": x / w, "b": 0.2 + x / w * 0.8, "a": y / h},
}
elif primitive_type == "cylinders":
return {
"pose": pose,
"size": {"x": 3 / w, "y": 3 / h, "z": 0.05},
"color": {"r": x / w, "g": x / w, "b": 0.2 + x / w * 0.8, "a": y / h},
"top_scale": 0.75 + 0.25 * math.sin(x + 2 * math.pi * frame / nframes),
"bottom_scale": 0.75 + 0.25 * math.cos(y + 2 * math.pi * frame / nframes),
}
elif primitive_type == "texts":
scale_invariant = y > w - x
billboard = y > x
return {
"pose": pose,
"color": {"r": x / w, "g": x / w, "b": 0.2 + x / w * 0.8, "a": y / h},
"font_size": 0.15 * (60 if billboard and scale_invariant else 1),
"billboard": billboard,
"scale_invariant": scale_invariant,
"text": f"{y*w + x}",
}
scene_update = {
"entities": [
{
"timestamp": timestamp,
"frame_id": "entity",
"id": f"my{primitive_type}",
# "lifetime": {"sec": 0, "nsec": 0},
"frame_locked": False,
primitive_type: [make_primitive(i) for i in range(nshapes_visible)],
},
{
"timestamp": timestamp,
"frame_id": "scene",
"id": f"my{primitive_type}2",
# "lifetime": {"sec": 0, "nsec": 0},
"frame_locked": False,
primitive_type: [make_primitive2(x, y, 20, 20) for x in range(20) for y in range(20)],
},
]
}
writer.add_message(ch1, t, json.dumps(scene_update).encode(), t)
if frame % 40 == 0:
i = frame // 40
r, g, b = colorsys.hsv_to_rgb(i / 10, 1, 1)
q = Quaternion()
pose = {
"position": {"x": i, "y": 0, "z": 0},
"orientation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w},
}
if primitive_type == "cubes" or primitive_type == "spheres":
primitive = {
"pose": pose,
"size": {"x": 1, "y": 0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes), "z": 1},
"color": {"r": r, "g": g, "b": b, "a": 1},
}
elif primitive_type == "models":
model = models[i % len(models)]
primitive = {
"pose": pose,
"scale": {"x": 1 * model[1], "y": (0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes)) * model[1], "z": 1 * model[1]},
"color": {"r": r, "g": g, "b": b, "a": 1},
**model_info(model[0]),
"override_color": i % 3 != 0,
}
elif primitive_type == "lines":
scale_invariant = i % 2 == 0
m = 0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes)
primitive = {
"type": i % 3,
"pose": pose,
"thickness": (0.5 + 0.25 * m) * (10 if scale_invariant else 0.5),
"scale_invariant": scale_invariant,
"points": [
{
"x": math.cos(2 * math.pi * (j / 10 + frame / nframes)),
"y": m * math.sin(2 * math.pi * (j / 10 + 1.5 * frame / nframes)),
"z": j / 10,
}
for j in range(10)
],
"color": {"r": r, "g": g, "b": b, "a": 1},
"colors": [make_color(j / 10) for j in range(10)] if i % 3 == 0 else [],
"indices": [],
}
elif primitive_type == "arrows":
primitive = {
"pose": pose,
"shaft_length": 0.6 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"shaft_diameter": 0.1 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"head_length": 0.4 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"head_diameter": 0.3 + 0.25 * math.sin(i + 2 * math.pi * frame / nframes),
"color": {"r": r, "g": g, "b": b, "a": 1},
}
elif primitive_type == "cylinders":
primitive = {
"pose": pose,
"size": {"x": 1, "y": 0.5 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes), "z": 1},
"color": {"r": r, "g": g, "b": b, "a": 1},
"top_scale": 0.25 + 0.25 * math.sin(i * 2 * math.pi * frame / nframes),
"bottom_scale": 0.25 + 0.25 * math.cos(i * 2 * math.pi * frame / nframes),
}
elif primitive_type == "texts":
primitive = {
"pose": pose,
"color": {"r": r, "g": g, "b": b, "a": 1},
"font_size": 0.2 + 0.1 * math.sin(i * 2 * math.pi * frame / nframes),
"billboard": i % 3 != 0,
"scale_invariant": False,
"text": "Hello\nWorld",
}
scene_update2 = {
"deletions": [{"timestamp": timestamp, "type": 1 if i % 10 == 0 else 0, "id": f"my{primitive_type}-{i-1}"}]
if i > 0 and i % 2 == 0 and False
else [],
"entities": [
{
"timestamp": timestamp,
"frame_id": "entity2",
"id": f"my{primitive_type}-{i}",
"frame_locked": i % 2 == 0,
# "lifetime": {"sec": 5, "nsec": 0},
primitive_type: [
primitive,
],
},
],
}
writer.add_message(ch3, t, json.dumps(scene_update2).encode(), t)
q = Quaternion(axis=[0, 0, 1], angle=2 * math.pi * t / 1_000_000_000 / 30)
frame_transform = {
"timestamp": timestamp,
"parent_frame_id": "scene",
"child_frame_id": "entity",
"translation": {"x": 0, "y": 0, "z": 0},
"rotation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w},
}
writer.add_message(ch2, t, json.dumps(frame_transform).encode(), t)
q = Quaternion(axis=[0, 0, 1], angle=2 * math.pi * t / 1_000_000_000 / 30)
frame_transform = {
"timestamp": timestamp,
"parent_frame_id": "scene",
"child_frame_id": "entity2",
"translation": {"x": 3, "y": 4, "z": 0},
"rotation": {"x": q.x, "y": q.y, "z": q.z, "w": q.w},
}
writer.add_message(ch2, t, json.dumps(frame_transform).encode(), t)
writer.finish()
stream.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment