Skip to content

Instantly share code, notes, and snippets.

@jhurliman
Created August 30, 2023 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jhurliman/1fe7c807d9f1f442e5cebc924abc82b8 to your computer and use it in GitHub Desktop.
Save jhurliman/1fe7c807d9f1f442e5cebc924abc82b8 to your computer and use it in GitHub Desktop.
accad_txt_to_mcap.py
"""
Convert a tab-separated text file list of timestamped body marker positions
from <https://accad.osu.edu/research/motion-lab/mocap-system-and-data> into an
MCAP file.
"""
import argparse
import csv
import json
import typing as tp
from pathlib import Path
from mcap.well_known import MessageEncoding, SchemaEncoding
from mcap.writer import Writer
def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("csv", help="The input CSV to read")
parser.add_argument(
"--output", "-o", default="out.mcap", help="The MCAP output path"
)
args = parser.parse_args()
with open(args.output, "wb") as f:
writer = Writer(f, chunk_size=4 * 1024 * 1024)
writer.start()
with open(Path(__file__).parent / "FrameTransforms.json", "rb") as f:
schema = f.read()
schema_id = writer.register_schema(
name="foxglove.FrameTransforms",
encoding=SchemaEncoding.JSONSchema,
data=schema,
)
channel_id = writer.register_channel(
topic="/tf",
message_encoding=MessageEncoding.JSON,
schema_id=schema_id,
)
for seq, seconds, fields in row_reader(args.csv):
time_ns = sec_to_nsec(seconds)
tfs = {"transforms": []}
for frame, val in fields.items():
tfs["transforms"].append(
{
"timestamp": sec_to_timestamp(seconds),
"parent_frame_id": "root",
"child_frame_id": frame,
"translation": {"x": val[0], "y": val[1], "z": val[2]},
"rotation": {"x": 0, "y": 0, "z": 0, "w": 1},
}
)
writer.add_message(
channel_id,
log_time=time_ns,
data=json.dumps(tfs).encode("utf-8"),
publish_time=time_ns,
sequence=seq
)
writer.finish()
def row_reader(csv_path: tp.Union[str, Path]):
DIMENSIONS = {"X": 0, "Y": 1, "Z": 2}
with open(csv_path, "r") as f:
reader = csv.DictReader(f, delimiter="\t")
for line in reader:
seq = 0
seconds = 0.0
fields = {}
# Iterate each field in the row
for field, value in line.items():
if field == "Field":
seq = int(value)
elif field == "Time":
seconds = float(value)
else:
# Field names are in the format "NAME:{X,Y,Z}" ie "C7:X"
name, coord = field.split(":")
if name not in fields:
fields[name] = [0, 0, 0]
# Convert from millimeters to meters
fields[name][DIMENSIONS[coord]] = float(value) / 1000
yield seq, seconds, fields
def sec_to_nsec(sec: float) -> int:
return int(sec * 1e9)
def sec_to_timestamp(sec: float) -> tp.Dict[str, int]:
return {"sec": int(sec), "nsec": int((sec - int(sec)) * 1e9)}
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment