Created
July 14, 2019 19:56
-
-
Save harlowja/405e255065f3236b4d0db5a8d94d1825 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void Bag::reindexChunk(uint64_t chunk_pos) { | |
map<uint32_t, multiset<IndexEntry> >::iterator cit; | |
std::map<uint32_t, uint32_t>::iterator it; | |
ChunkHeader chunk_header; | |
ChunkInfo chunk_info; | |
bool chunk_ok = false; | |
uint32_t data_size = 0; | |
uint8_t op = 0xff; | |
readChunkHeader(chunk_header); | |
if (chunk_header.compressed_size == 0) { | |
throw BagFormatException("Unterminated chunk"); | |
} | |
uint64_t offset = file_.getOffset() - chunk_pos; | |
while (offset < chunk_header.uncompressed_size) { | |
op = 0xff; | |
peekNextHeaderOp(op); | |
if (!isValidOp(op)) { | |
throw BagFormatException("Could not peek/read a valid 'op'"); | |
} | |
if (op == OP_MSG_DATA) { | |
ros::Header header; | |
if(!readHeader(header)) { | |
throw BagFormatException("Could not read required OP_MSG_DATA header"); | |
} | |
M_string& fields = *header.getValues(); | |
uint32_t connection_id; | |
if(!readField(fields, CONNECTION_FIELD_NAME, true, &connection_id)) { | |
throw BagFormatException("Could not read required 'conn' header value"); | |
} | |
ros::Time t; | |
if(!readField(fields, TIME_FIELD_NAME, true, &t)) { | |
throw BagFormatException("Could not read required 'time' header value"); | |
} | |
if (!chunk_ok) { | |
chunk_info.pos = chunk_pos; | |
chunk_info.start_time = t; | |
chunk_info.end_time = t; | |
chunk_ok = true; | |
} | |
else { | |
if (t > chunk_info.end_time) { | |
chunk_info.end_time = t; | |
} | |
else if (t < chunk_info.start_time) { | |
chunk_info.start_time = t; | |
} | |
} | |
it = chunk_info.connection_counts.find(connection_id); | |
if (it == chunk_info.connection_counts.end()) { | |
chunk_info.connection_counts[connection_id] = 1; | |
} | |
else { | |
it->second = it->second + 1; | |
} | |
if(!readDataLength(data_size)) { | |
throw BagFormatException("Could not read message data length"); | |
} | |
seek(data_size, std::ios::cur); | |
IndexEntry index_entry; | |
index_entry.time = t; | |
index_entry.chunk_pos = chunk_pos; | |
index_entry.offset = offset; | |
cit = connection_indexes_.find(connection_id); | |
if (cit == connection_indexes_.end()) { | |
throw BagFormatException("Connection id not not preceded by connection record"); | |
} else { | |
cit->second.insert(index_entry); | |
connection_indexes_count_++; | |
} | |
} else if (op == OP_CONNECTION) { | |
ConnectionInfo* ci = readConnectionRecord(); | |
cit = connection_indexes_.find(ci->id); | |
if (cit == connection_indexes_.end()) { | |
connection_indexes_[ci->id] = multiset<IndexEntry>(); | |
} | |
} | |
else { | |
if(!readDataLength(data_size)) { | |
throw BagFormatException("Could not read data length"); | |
} | |
seek(data_size, std::ios::cur); | |
if(!readDataLength(data_size)) { | |
throw BagFormatException("Could not read data length"); | |
} | |
seek(data_size, std::ios::cur); | |
} | |
offset = file_.getOffset() - chunk_pos; | |
} | |
offset = file_.getOffset(); | |
if (offset < file_size_) { | |
data_size = 0; | |
op = 0xff; | |
peekNextHeaderOp(op); | |
if (!isValidOp(op)) { | |
throw BagFormatException("Could not peek/read a valid 'op'"); | |
} | |
while (op != OP_CHUNK) { | |
offset = file_.getOffset(); | |
if (offset >= file_size_) { | |
break; | |
} | |
if (op == OP_INDEX_DATA) { | |
ros::Header header; | |
if (!readHeader(header)) { | |
throw BagFormatException("Error reading INDEX_DATA header"); | |
} | |
M_string& fields = *header.getValues(); | |
uint32_t count; | |
if(!readField(fields, COUNT_FIELD_NAME, true, &count)) { | |
throw BagFormatException("Error reading INDEX_DATA header count value"); | |
} | |
uint32_t v; | |
if(!readField(fields, VER_FIELD_NAME, true, &v)) { | |
throw BagFormatException("Error reading INDEX_DATA header ver value"); | |
} | |
if (v != 1) { | |
throw BagFormatException("Expecting index version 1"); | |
} | |
uint32_t data_size; | |
if (!readDataLength(data_size)) { | |
throw BagFormatException("Error reading INDEX_DATA size"); | |
} | |
uint64_t time; | |
uint32_t offset; | |
for (uint32_t i = 0; i < count; ++i) { | |
read((char*) &time, 8); | |
read((char*) &offset, 4); | |
} | |
} | |
else { | |
if(!readDataLength(data_size)) { | |
throw BagFormatException("Could not read data length"); | |
} | |
seek(data_size, std::ios::cur); | |
if(!readDataLength(data_size)) { | |
throw BagFormatException("Could not read data length"); | |
} | |
seek(data_size, std::ios::cur); | |
} | |
op = 0xff; | |
peekNextHeaderOp(op); | |
if (!isValidOp(op)) { | |
throw BagFormatException("Could not peek/read a valid 'op'"); | |
} | |
} | |
} | |
if (chunk_ok) { | |
chunks_.push_back(chunk_info); | |
} | |
connection_count_ = connections_.size(); | |
chunk_count_ = chunks_.size(); | |
} | |
void Bag::reindex(string const& filename, FILE* f, uint64_t max_size, bool expand) { | |
if (file_.isOpen()) { | |
throw BagException("Bag is already open"); | |
} | |
file_.openWriteFile(filename, f); | |
readVersion(); | |
if (version_ != 200) { | |
throw BagException((format("Bag file version %1%.%2% is unsupported for reindexing") % getMajorVersion() % getMinorVersion()).str()); | |
} | |
if (compression_ != compression::Uncompressed) { | |
throw BagException("Bag reindexing currently only works for uncompressed bags (use the python implementation for other ones)"); | |
} | |
ros::Header header; | |
uint32_t data_size = 0; | |
if (!readHeader(header) || !readDataLength(data_size)) { | |
throw BagFormatException("Error reading FILE_HEADER record"); | |
} | |
M_string& fields = *header.getValues(); | |
if (!isOp(fields, OP_FILE_HEADER)) { | |
throw BagFormatException("Expected FILE_HEADER op not found"); | |
} | |
readField(fields, INDEX_POS_FIELD_NAME, true, (uint64_t*) &index_data_pos_); | |
if (index_data_pos_ != 0) { | |
file_.close(); | |
return; | |
} | |
// Skip the data section (just padding) | |
seek(data_size, std::ios::cur); | |
// Determine max size. | |
uint64_t pos = file_.getOffset(); | |
seek(0, std::ios::end); | |
file_size_ = file_.getOffset(); | |
seek(pos); | |
// Basically this is moving from python into here, | |
// https://github.com/strawlab/ros_comm/blob/master/tools/rosbag/src/rosbag/bag.py#L1871 | |
// Maybe someday we can submit this for https://github.com/ros/ros_comm/issues/1353 | |
uint64_t trunc_pos = 0; | |
bool needs_repair = false; | |
while (true) { | |
uint64_t offset = file_.getOffset(); | |
if (offset >= file_size_) { | |
break; | |
} | |
trunc_pos = offset; | |
try { | |
reindexChunk(offset); | |
} | |
catch (const BagFormatException &e) { | |
needs_repair = true; | |
break; | |
} | |
catch (const BagIOException &e) { | |
needs_repair = true; | |
break; | |
} | |
} | |
if (needs_repair) { | |
file_.truncate(trunc_pos); | |
file_.flush(); | |
file_.sync(); | |
seek(0, std::ios::end); | |
if (expand) { | |
file_size_ = file_.getOffset(); | |
file_.expand(max_size, true); | |
shrunk_ = 0; | |
} | |
else { | |
shrunk_ = 1; | |
} | |
file_size_ = file_.getOffset(); | |
stopWriting(); | |
file_.flush(); | |
file_.sync(); | |
seek(0, std::ios::end); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment