-
-
Save patagonaa/a53db8cc37c6820023b2c51cf3a60869 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3 | |
import io | |
import logging | |
import os | |
import struct | |
import sys | |
import csv | |
from typing import BinaryIO | |
import matplotlib.pyplot as plt | |
def main(): | |
action = sys.argv[1] | |
file = sys.argv[2] | |
(acgy, vrot, framerate) = parseFile(file) | |
acgyData = parseAcgy(acgy) | |
vrotData = parseVrot(vrot, framerate) | |
if action == "plot": | |
plot(acgyData, vrotData) | |
elif action == "csv": | |
toCSV(acgyData, vrotData, file) | |
def parseFile(name: str): | |
acgy = None | |
vrot = None | |
mvhd = None | |
stts = None | |
with open(name, "rb") as f: | |
root = getBoxes(f, 0, None) | |
if not root or root[0].type != b"ftyp": | |
logging.error("invalid file") | |
return (None, None, None) | |
for rootChild in root: | |
if rootChild.type == b"moov": | |
for moovChild in rootChild.children: | |
if moovChild.type == b"udta": | |
for udtaBox in moovChild.children: | |
if udtaBox.type == b"acgy": | |
acgy = getBoxData(f, udtaBox) | |
if udtaBox.type == b"vrot": | |
vrot = getBoxData(f, udtaBox) | |
if moovChild.type == b"mvhd": | |
mvhd = getBoxData(f, moovChild) | |
if moovChild.type == b"trak" and trakIsVideo(f, moovChild): | |
for trakChild in moovChild.children: | |
if trakChild.type == b"mdia": | |
for mdiaChild in trakChild.children: | |
if mdiaChild.type == b"minf": | |
for minfChild in mdiaChild.children: | |
if minfChild.type == b"stbl": | |
for stblChild in minfChild.children: | |
if stblChild.type == b"stts": | |
stts = getBoxData(f, stblChild) | |
frameRate = None | |
if mvhd and stts: | |
frameRate = getMvhdTimeScale(mvhd) / getSttsSampleDuration(stts) | |
return (acgy, vrot, frameRate) | |
class Box: | |
def __init__(self, offset: int, length: int, type: bytes): | |
self.offset = offset | |
self.length = length | |
self.type = type | |
self.children: list[Box] = [] | |
def __repr__(self) -> str: | |
return f"\n{self.offset}, {self.length}, {self.type}, {self.children}\n" | |
def trakIsVideo(data: BinaryIO, trak: Box): | |
for trakChild in trak.children: | |
if trakChild.type == b"mdia": | |
for mdiaChild in trakChild.children: | |
if mdiaChild.type == b"hdlr": | |
boxData = getBoxData(data, mdiaChild) | |
(compType, compSubType) = struct.unpack("> x 3x 4s 4s", boxData[0:12]) | |
if compSubType == b"vide": | |
return True | |
return False | |
def getMvhdTimeScale(boxData: bytes) -> int: | |
(version, timeScale, timeDuration) = struct.unpack("> c 3x 4x 4x I I", boxData[0:20]) | |
if version != b"\0": | |
raise Exception("unhandled mvhd version") | |
return timeScale | |
def getSttsSampleDuration(boxData: bytes) -> int: | |
(version, numEntries, sampleCount, sampleDuration) = struct.unpack("> c 3x I I I", boxData[0:16]) | |
if version != b"\0": | |
raise Exception("unhandled stts version") | |
if numEntries != 1: | |
raise Exception("stts table should only have one entry") | |
return sampleDuration | |
def getBoxes(data: BinaryIO, offset: int, length: int|None) -> list[Box]: | |
boxes = [] | |
end = offset + length if length is not None else None | |
data.seek(offset, os.SEEK_SET) | |
while end is None or offset < end: | |
header = data.read(8) | |
if not header: | |
break | |
(boxLength, boxType) = struct.unpack("> I 4s", header) | |
box = Box(offset, boxLength, boxType) | |
if box.type in [b"moov", b"udta", b"trak", b"mdia", b"minf", b"stbl"]: | |
box.children = getBoxes(data, offset + 8, boxLength - 8) | |
offset += boxLength | |
data.seek(offset, os.SEEK_SET) | |
boxes.append(box) | |
return boxes | |
def getBoxData(data: BinaryIO, box: Box) -> bytes: | |
data.seek(box.offset + 8, os.SEEK_SET) | |
return data.read(box.length - 8) | |
def parseAcgy(acgy: bytes) -> list[tuple]: | |
acgyData = [] | |
with io.BytesIO(acgy) as file: | |
file.seek(24) | |
while msg := file.read(20): | |
if len(msg) < 20: | |
break | |
[ts, x, y, z, roll, pitch, yaw] = struct.unpack(">qhhhhhh", msg) | |
# from LSM6DS3 datasheet | |
accelFactor = 0.122 / 1000 | |
gyroFactor = 70 / 1000 | |
scaledX = x * accelFactor | |
scaledY = y * accelFactor | |
scaledZ = z * accelFactor | |
scaledRoll = roll * gyroFactor | |
scaledPitch = pitch * gyroFactor | |
scaledYaw = yaw * gyroFactor | |
acgyData.append((ts / 1e6, scaledX, scaledY, scaledZ, scaledRoll, scaledPitch, scaledYaw)) | |
return acgyData | |
def parseVrot(vrot: bytes, framerate: float) -> list[tuple]: | |
rotData = [] | |
with io.BytesIO(vrot) as f: | |
i = 0 | |
while msg := f.read(24): | |
if len(msg) < 24: | |
break | |
[a1, a2, b1, b2, c1, c2] = struct.unpack(">iiiiii", msg) | |
r = (a1 / a2) - 360 if (a1 / a2) > 180 else (a1 / a2) | |
p = c1 / c2 | |
y = b1 / b2 | |
rotData.append((i / framerate, r, p, y)) | |
i+=1 | |
return rotData | |
def plot(acgyData: list[tuple], rotData: list[tuple]): | |
(figure, (accelPlot, gyroPlot, rotdataPlot)) = plt.subplots(3, sharex=True) | |
accelPlot.plot([x[0] for x in acgyData], [x[1] for x in acgyData], label="acgydata_x") | |
accelPlot.plot([x[0] for x in acgyData], [x[2] for x in acgyData], label="acgydata_y") | |
accelPlot.plot([x[0] for x in acgyData], [x[3] for x in acgyData], label="acgydata_z") | |
accelPlot.legend() | |
gyroPlot.plot([x[0] for x in acgyData], [x[4] for x in acgyData], label="acgydata_roll") | |
gyroPlot.plot([x[0] for x in acgyData], [x[5] for x in acgyData], label="acgydata_pitch") | |
gyroPlot.plot([x[0] for x in acgyData], [x[6] for x in acgyData], label="acgydata_yaw") | |
gyroPlot.legend() | |
rotdataPlot.plot([x[0] for x in rotData], [x[1] for x in rotData], label="rotdata_roll") | |
rotdataPlot.plot([x[0] for x in rotData], [x[2] for x in rotData], label="rotdata_pitch") | |
rotdataPlot.plot([x[0] for x in rotData], [x[3] for x in rotData], label="rotdata_yaw") | |
rotdataPlot.legend() | |
plt.show() | |
def toCSV(acgyData: list[tuple]|None, rotData: list[tuple]|None, inputFile: str): | |
if acgyData: | |
with open(inputFile+".acgydata.csv", "w", newline="") as f: | |
writer = csv.writer(f, delimiter=";") | |
writer.writerow(["timestamp", "x", "y", "z", "roll", "pitch", "yaw"]) | |
writer.writerows(acgyData) | |
if rotData: | |
with open(inputFile+".rotdata.csv", "w", newline="") as f: | |
writer = csv.writer(f, delimiter=";") | |
writer.writerow(["timestamp", "roll", "pitch", "yaw"]) | |
writer.writerows(rotData) | |
main() |
Do you know if such data is saved when recording with one lens? The files are then named SAM_ instead of 360_
Idk, try it out if you need it?
Also that telemetry parser you linked does not seem to support the Samsung Gear format, but it could probably be implemented pretty easily
Do you know if such data is saved when recording with one lens? The files are then named SAM_ instead of 360_
Idk, try it out if you need it? Also that telemetry parser you linked does not seem to support the Samsung Gear format, but it could probably be implemented pretty easily
I tried and your code only works with 360_
I can share sample video https://artrix.eu/SAM_0167.MP4
Had a look at the sample video, it doesn't seem to contain the accelerometer/gyro data in the metadata (it's in moov->udta->acgy/vrot), you can view all the metadata with something like https://www.onlinemp4parser.com/
Do you know if such data is saved when recording with one lens? The files are then named SAM_ instead of 360_
Stabilization software exists and is popular https://github.com/gyroflow/gyroflow but it does not support 360 video and uses it to extract data from recordings https://github.com/AdrianEddy/telemetry-parser