Skip to content

Instantly share code, notes, and snippets.

@vinaykumarhegde
Forked from yfyf/export_pose_graph.py
Created September 3, 2020 21:23
Show Gist options
  • Save vinaykumarhegde/20c2a1ee2d1f1bbb2aadac023456e7c6 to your computer and use it in GitHub Desktop.
Save vinaykumarhegde/20c2a1ee2d1f1bbb2aadac023456e7c6 to your computer and use it in GitHub Desktop.
Cartographer Pose Graph
#!/usr/bin/env python3
# Adapted from:
#
# https://gist.github.com/mjcarroll/f620ef8d88efc3a03711259d6fc9a578
#
# Original gist fixed to work with latest version of cartographer:
#
# [~/sensmetry/slam/cartographer:master]λ git describe --always
# bcd5486
# [~/slam/cartographer_ros:master]λ git describe --always
# 1de03b3
#
# Requirements:
# python3
# protobuf==3.*
#
# Preprocessing:
# cd cartographer
# # rename directories starting with a number - python
# # can't import those in a straightforward way
# find . -name '[0-9]*' -type d -exec rename 's/2d/i2d/' {} \;
# find . -name '[0-9]*' -type d -exec rename 's/3d/i3d/' {} \;
# find . -iname '*.proto' -exec sed -i 's|/2d/|/i2d/|' {} \;
# find . -iname '*.proto' -exec sed -i 's|/3d/|/i3d/|' {} \;
# mkdir py_export
# protoc --python_out ./py_export -I . `find . -iname '*.proto'`
import struct
import gzip
import argparse
import cartographer.mapping
from cartographer.mapping.proto.serialization_pb2 import SerializationHeader, SerializedData
from cartographer.mapping.proto.pose_graph_pb2 import PoseGraph
class PbstreamFile:
def __init__(self, pbstream_filename):
self.pbstream_filename = pbstream_filename
self.pbstream_file = open(pbstream_filename, 'rb')
assert(self.is_valid())
self.ser_data_by_type = {}
self.read_serialization_header()
while self.pbstream_file:
serdata = self.read_serialized_data()
if serdata:
serdata_type = serdata.WhichOneof('data')
data = getattr(serdata, serdata_type)
cur = self.ser_data_by_type.get(serdata_type, [])
cur.append(data)
self.ser_data_by_type[serdata_type] = cur
else:
break
def read_chunk(self):
sz = self.read_size()
if sz:
buf = self.pbstream_file.read(sz)
buf = gzip.decompress(buf)
return buf
def read_serialization_header(self):
buf = self.read_chunk()
if buf:
sh = SerializationHeader()
sh.ParseFromString(buf)
return sh
def read_serialized_data(self):
buf = self.read_chunk()
if buf:
sd = SerializedData()
sd.ParseFromString(buf)
return sd
def is_valid(self):
kMagic = [0x7b, 0x1d, 0x1f, 0x7b, 0x5b, 0xf5, 0x01, 0xdb]
self.pbstream_file.seek(0)
magic = self.pbstream_file.read(8)
return all([x == y for x, y in zip(kMagic[::-1], magic)])
def read_size(self):
buf = self.pbstream_file.read(8)
if len(buf) < 8:
return 0
return struct.unpack_from("<Q", buf)[0]
def pose_graph(self):
pose_graphs = self.ser_data_by_type['pose_graph']
if pose_graphs:
if len(pose_graphs) > 1:
print("Warning: multiple pose graphs parsed in Serialized Data "
"but only one should be present. Returning the first one.")
return pose_graphs[0]
else:
return None
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process and export live SLAM results")
parser.add_argument('pbstream_filename', type=str)
args = parser.parse_args()
pbf = PbstreamFile(args.pbstream_filename)
pg = pbf.pose_graph()
print(pg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment