Skip to content

Instantly share code, notes, and snippets.

@harlowja
Created July 14, 2019 19:56
Show Gist options
  • Save harlowja/405e255065f3236b4d0db5a8d94d1825 to your computer and use it in GitHub Desktop.
Save harlowja/405e255065f3236b4d0db5a8d94d1825 to your computer and use it in GitHub Desktop.
void Bag::reindexChunk(uint64_t chunk_pos) {
map<uint32_t, multiset<IndexEntry> >::iterator cit;
std::map<uint32_t, uint32_t>::iterator it;
ChunkHeader chunk_header;
ChunkInfo chunk_info;
bool chunk_ok = false;
uint32_t data_size = 0;
uint8_t op = 0xff;
readChunkHeader(chunk_header);
if (chunk_header.compressed_size == 0) {
throw BagFormatException("Unterminated chunk");
}
uint64_t offset = file_.getOffset() - chunk_pos;
while (offset < chunk_header.uncompressed_size) {
op = 0xff;
peekNextHeaderOp(op);
if (!isValidOp(op)) {
throw BagFormatException("Could not peek/read a valid 'op'");
}
if (op == OP_MSG_DATA) {
ros::Header header;
if(!readHeader(header)) {
throw BagFormatException("Could not read required OP_MSG_DATA header");
}
M_string& fields = *header.getValues();
uint32_t connection_id;
if(!readField(fields, CONNECTION_FIELD_NAME, true, &connection_id)) {
throw BagFormatException("Could not read required 'conn' header value");
}
ros::Time t;
if(!readField(fields, TIME_FIELD_NAME, true, &t)) {
throw BagFormatException("Could not read required 'time' header value");
}
if (!chunk_ok) {
chunk_info.pos = chunk_pos;
chunk_info.start_time = t;
chunk_info.end_time = t;
chunk_ok = true;
}
else {
if (t > chunk_info.end_time) {
chunk_info.end_time = t;
}
else if (t < chunk_info.start_time) {
chunk_info.start_time = t;
}
}
it = chunk_info.connection_counts.find(connection_id);
if (it == chunk_info.connection_counts.end()) {
chunk_info.connection_counts[connection_id] = 1;
}
else {
it->second = it->second + 1;
}
if(!readDataLength(data_size)) {
throw BagFormatException("Could not read message data length");
}
seek(data_size, std::ios::cur);
IndexEntry index_entry;
index_entry.time = t;
index_entry.chunk_pos = chunk_pos;
index_entry.offset = offset;
cit = connection_indexes_.find(connection_id);
if (cit == connection_indexes_.end()) {
throw BagFormatException("Connection id not not preceded by connection record");
} else {
cit->second.insert(index_entry);
connection_indexes_count_++;
}
} else if (op == OP_CONNECTION) {
ConnectionInfo* ci = readConnectionRecord();
cit = connection_indexes_.find(ci->id);
if (cit == connection_indexes_.end()) {
connection_indexes_[ci->id] = multiset<IndexEntry>();
}
}
else {
if(!readDataLength(data_size)) {
throw BagFormatException("Could not read data length");
}
seek(data_size, std::ios::cur);
if(!readDataLength(data_size)) {
throw BagFormatException("Could not read data length");
}
seek(data_size, std::ios::cur);
}
offset = file_.getOffset() - chunk_pos;
}
offset = file_.getOffset();
if (offset < file_size_) {
data_size = 0;
op = 0xff;
peekNextHeaderOp(op);
if (!isValidOp(op)) {
throw BagFormatException("Could not peek/read a valid 'op'");
}
while (op != OP_CHUNK) {
offset = file_.getOffset();
if (offset >= file_size_) {
break;
}
if (op == OP_INDEX_DATA) {
ros::Header header;
if (!readHeader(header)) {
throw BagFormatException("Error reading INDEX_DATA header");
}
M_string& fields = *header.getValues();
uint32_t count;
if(!readField(fields, COUNT_FIELD_NAME, true, &count)) {
throw BagFormatException("Error reading INDEX_DATA header count value");
}
uint32_t v;
if(!readField(fields, VER_FIELD_NAME, true, &v)) {
throw BagFormatException("Error reading INDEX_DATA header ver value");
}
if (v != 1) {
throw BagFormatException("Expecting index version 1");
}
uint32_t data_size;
if (!readDataLength(data_size)) {
throw BagFormatException("Error reading INDEX_DATA size");
}
uint64_t time;
uint32_t offset;
for (uint32_t i = 0; i < count; ++i) {
read((char*) &time, 8);
read((char*) &offset, 4);
}
}
else {
if(!readDataLength(data_size)) {
throw BagFormatException("Could not read data length");
}
seek(data_size, std::ios::cur);
if(!readDataLength(data_size)) {
throw BagFormatException("Could not read data length");
}
seek(data_size, std::ios::cur);
}
op = 0xff;
peekNextHeaderOp(op);
if (!isValidOp(op)) {
throw BagFormatException("Could not peek/read a valid 'op'");
}
}
}
if (chunk_ok) {
chunks_.push_back(chunk_info);
}
connection_count_ = connections_.size();
chunk_count_ = chunks_.size();
}
void Bag::reindex(string const& filename, FILE* f, uint64_t max_size, bool expand) {
if (file_.isOpen()) {
throw BagException("Bag is already open");
}
file_.openWriteFile(filename, f);
readVersion();
if (version_ != 200) {
throw BagException((format("Bag file version %1%.%2% is unsupported for reindexing") % getMajorVersion() % getMinorVersion()).str());
}
if (compression_ != compression::Uncompressed) {
throw BagException("Bag reindexing currently only works for uncompressed bags (use the python implementation for other ones)");
}
ros::Header header;
uint32_t data_size = 0;
if (!readHeader(header) || !readDataLength(data_size)) {
throw BagFormatException("Error reading FILE_HEADER record");
}
M_string& fields = *header.getValues();
if (!isOp(fields, OP_FILE_HEADER)) {
throw BagFormatException("Expected FILE_HEADER op not found");
}
readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_);
if (index_data_pos_ != 0) {
file_.close();
return;
}
// Skip the data section (just padding)
seek(data_size, std::ios::cur);
// Determine max size.
uint64_t pos = file_.getOffset();
seek(0, std::ios::end);
file_size_ = file_.getOffset();
seek(pos);
// Basically this is moving from python into here,
// https://github.com/strawlab/ros_comm/blob/master/tools/rosbag/src/rosbag/bag.py#L1871
// Maybe someday we can submit this for https://github.com/ros/ros_comm/issues/1353
uint64_t trunc_pos = 0;
bool needs_repair = false;
while (true) {
uint64_t offset = file_.getOffset();
if (offset >= file_size_) {
break;
}
trunc_pos = offset;
try {
reindexChunk(offset);
}
catch (const BagFormatException &e) {
needs_repair = true;
break;
}
catch (const BagIOException &e) {
needs_repair = true;
break;
}
}
if (needs_repair) {
file_.truncate(trunc_pos);
file_.flush();
file_.sync();
seek(0, std::ios::end);
if (expand) {
file_size_ = file_.getOffset();
file_.expand(max_size, true);
shrunk_ = 0;
}
else {
shrunk_ = 1;
}
file_size_ = file_.getOffset();
stopWriting();
file_.flush();
file_.sync();
seek(0, std::ios::end);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment