Skip to content

Instantly share code, notes, and snippets.

@auscompgeek
Created July 5, 2024 07:14
Show Gist options
  • Save auscompgeek/5bac481a7bfece2fb522d0b3aa588ee0 to your computer and use it in GitHub Desktop.
Save auscompgeek/5bac481a7bfece2fb522d0b3aa588ee0 to your computer and use it in GitHub Desktop.
read photonvision multi-tag result reprojection errors
import collections.abc
import datetime
import sys
import photonlibpy.packet
import photonlibpy.photonPipelineResult
import wpiutil.log
def main() -> None:
log = wpiutil.log.DataLogReader(sys.argv[1])
cams = {
"NT:/photonvision/ardu_cam_port/rawBytes": "port",
"NT:/photonvision/ardu_cam_starboard/rawBytes": "starboard"
}
for timestamp, entry, value in read_log(log):
if entry.name not in cams:
continue
assert isinstance(value, bytes)
if not value:
continue
packet = photonlibpy.packet.Packet(value)
result = photonlibpy.photonPipelineResult.PhotonPipelineResult()
result.populateFromPacket(packet)
p = result.multiTagResult.estimatedPose
if p.isPresent:
print(p.bestReprojError, cams[entry.name], sep="\t")
def read_log(log: wpiutil.log.DataLogReader) -> collections.abc.Iterable[tuple[float, wpiutil.log.DataLogEntry, object]]:
entries = {}
for record in log:
timestamp = record.getTimestamp() / 1000000
if record.isStart():
try:
data = record.getStartData()
# print(f"{data} [{timestamp}]")
if data.entry in entries:
print("...DUPLICATE entry ID, overriding", file=sys.stderr)
entries[data.entry] = data
except TypeError as e:
print("Start(INVALID)", e, file=sys.stderr)
elif record.isFinish():
try:
entry = record.getFinishEntry()
print(f"Finish({entry}) [{timestamp}]", file=sys.stderr)
if entry not in entries:
print("...ID not found", file=sys.stderr)
else:
del entries[entry]
except TypeError as e:
print("Finish(INVALID)", e, file=sys.stderr)
elif record.isSetMetadata():
try:
data = record.getSetMetadataData()
print(f"{data} [{timestamp}]")
if data.entry not in entries:
print("...ID not found")
except TypeError as e:
print("SetMetadata(INVALID)", e, file=sys.stderr)
elif record.isControl():
print("Unrecognized control record", file=sys.stderr)
else:
# print(f"Data({record.getEntry()}, size={record.getSize()}) ", end="")
entry = entries.get(record.getEntry(), None)
if entry is None:
print("<ID not found>", file=sys.stderr)
continue
# print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]")
try:
# handle systemTime specially
if entry.name == "systemTime" and entry.type == "int64":
dt = datetime.datetime.fromtimestamp(record.getInteger() / 1000000)
yield timestamp, entry, dt
continue
value: object
if entry.type == "double":
value = record.getDouble()
elif entry.type == "float":
value = record.getFloat()
elif entry.type == "int64":
value = record.getInteger()
elif entry.type == "string":
value = record.getString()
elif entry.type == "json":
value = record.getString()
elif entry.type == "boolean":
value = record.getBoolean()
elif entry.type == "boolean[]":
value = record.getBooleanArray()
elif entry.type == "double[]":
value = record.getDoubleArray()
elif entry.type == "float[]":
value = record.getFloatArray()
elif entry.type == "int64[]":
value = record.getIntegerArray()
elif entry.type == "string[]":
value = record.getStringArray()
elif entry.type == "rawBytes":
value = record.getRaw()
else:
print("unhandled", entry.type, file=sys.stderr)
continue
yield timestamp, entry, value
except TypeError as e:
print(" invalid", e, file=sys.stderr)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment