Last active
March 25, 2018 14:00
-
-
Save tvoinarovskyi/5b0d8b7c26f72ab3f270cae55c51678c to your computer and use it in GitHub Desktop.
Command line tools for some strange X file format. Reference code for my slides at UAPyCon2018 "Binary data in Python with a bit of C spice on top"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This file contains a command-line utility for working with X file format | |
# X file contains sequential messages in the following format: | |
# | |
# MessageID => Int64 | |
# MessageLength => Int32 | |
# MessageCRC => UInt32 | |
# MessageFormatVersion => Int8 | |
# MessageBody => Bytes | |
# | |
# We assume, that message ids are in incremental order | |
# | |
# Index format will be stored as: | |
# | |
# ArraySize => Int32 | |
# Array[ArraySize] => | |
# MessageID => Int64 | |
# MessageOffset => UInt64 | |
# | |
import argparse | |
import io | |
import pathlib | |
import struct | |
import sys | |
import array | |
import bisect | |
import random | |
from binascii import crc32 | |
MAX_CHUNK_SIZE = 1024 * 1024 | |
HEADER = struct.Struct(">qiIb") | |
class CorruptedFile(RuntimeError): | |
pass | |
def process_file(f, out): | |
print("Building index for X file: {} to {}".format( | |
f.resolve(), out.resolve())) | |
header_size = HEADER.size | |
header_buf = bytearray(HEADER.size) | |
result_index = array.array("Q") | |
# Iterate over the inbound file and build an index of message_id to file | |
# offset | |
with f.open("rb", buffering=0) as file: | |
reader = io.BufferedReader(file, buffer_size=MAX_CHUNK_SIZE) | |
while True: | |
# We can read small amounts as the reader is a BufferedReader | |
readlen = reader.readinto(header_buf) | |
if readlen == 0: # EOF case | |
break | |
if readlen != header_size: # EOF without a full header | |
raise CorruptedFile() | |
header = HEADER.unpack(header_buf) | |
message_id, message_len = header[:2] | |
result_index.append(message_id) | |
result_index.append(reader.tell() - header_size) | |
reader.seek(message_len, io.SEEK_CUR) | |
with out.open("wb") as file: | |
file.write(struct.pack(">i", len(result_index) // 2)) | |
file.write(result_index) | |
print("Successfully written {} size index".format(len(result_index) // 2)) | |
def lookup_offset(index_file, message_id): | |
item_size = struct.calcsize(">qQ") | |
with index_file.open("rb", buffering=MAX_CHUNK_SIZE) as reader: | |
array_size = reader.read(struct.calcsize(">i")) | |
array_size = struct.unpack(">i", array_size)[0] | |
lookup_index = reader.read(array_size * item_size) | |
if len(lookup_index) != array_size * item_size: | |
raise CorruptedFile() | |
lookup_table = \ | |
memoryview(lookup_index).cast("Q", (array_size, 2)).tolist() | |
index = bisect.bisect(lookup_table, [message_id, 0]) | |
if lookup_table[index][0] == message_id: | |
print("Found message ID={}. Offset {}".format( | |
message_id, lookup_table[index][1])) | |
else: | |
print("Could not find message ID={}".format(message_id)) | |
def lookup_message(f, index_file, message_id, read_msg): | |
item_size = struct.calcsize(">qQ") | |
offset = None | |
with index_file.open("rb", buffering=MAX_CHUNK_SIZE) as reader: | |
array_size = reader.read(struct.calcsize(">i")) | |
array_size = struct.unpack(">i", array_size)[0] | |
lookup_index = reader.read(array_size * item_size) | |
if len(lookup_index) != array_size * item_size: | |
raise CorruptedFile() | |
lookup_table = \ | |
memoryview(lookup_index).cast("Q", (array_size, 2)).tolist() | |
index = bisect.bisect(lookup_table, [message_id, 0]) | |
if lookup_table[index][0] == message_id: | |
offset = lookup_table[index][1] | |
print("Found message ID={}. Offset {}".format(message_id, offset)) | |
else: | |
print("Could not find message ID={}".format(message_id)) | |
if read_msg and offset is not None: | |
with f.open("rb", buffering=MAX_CHUNK_SIZE) as reader: | |
reader.seek(offset) | |
header = HEADER.unpack(reader.read(HEADER.size)) | |
print("Message header: id={0}, length={1}, crc={2}, format={3}" | |
.format(*header)) | |
body = reader.read(header[1]) | |
assert header[2] == crc32(body), "CRC does not match..." | |
print("Message body: \n{}".format(body)) | |
def build_test(f, message_n=1_000_000, max_message_size=1024): | |
print("Building test data for X file: {}".format(f.resolve())) | |
with f.open("wb") as file: | |
format_version = 1 | |
for message_id in range(message_n): | |
message_size = random.randint(10, max_message_size) | |
struct_fmt = ">qiIb{:d}s".format(message_size) | |
message = b"X" * message_size | |
message_crc = crc32(message) | |
packed = struct.pack( | |
struct_fmt, | |
message_id, message_size, message_crc, | |
format_version, message) | |
file.write(packed) | |
def main(): | |
parser = get_parser() | |
args = parser.parse_args() | |
if args.cmd == "build_index": | |
process_file(args.input, args.output) | |
elif args.cmd == "build_test": | |
build_test(args.input) | |
elif args.cmd == "lookup_offset": | |
lookup_message( | |
args.input, args.index_file, args.message_id, args.read_msg) | |
else: | |
print(repr(args.input)) | |
def get_parser(): | |
parser = argparse.ArgumentParser(description='X filetype processor') | |
parser.add_argument( | |
"-i", "--input", type=pathlib.Path, | |
help="File path containing the X file format file") | |
parser.set_defaults(cmd=None) | |
subparsers = parser.add_subparsers(help='sub-command help') | |
# Re-Build file index | |
build_index = subparsers.add_parser( | |
"build-index", help="Iterate over all messages and build msg_id " | |
"to file offset mapping") | |
build_index.set_defaults(cmd="build_index") | |
build_index.add_argument( | |
"-o", "--output", type=pathlib.Path, | |
help="File path where to write the resulting index") | |
# Build a test file with a lot of data | |
build_test = subparsers.add_parser( | |
"build-test", help="Create a testing file in X format") | |
build_test.set_defaults(cmd="build_test") | |
# Lookup the message in X file | |
lookup_offset = subparsers.add_parser( | |
"lookup-offset", help="Get the offset in X file of message with id") | |
lookup_offset.add_argument( | |
"--index-file", type=pathlib.Path, required=True, | |
help="File path from which to read the index file") | |
lookup_offset.add_argument( | |
"--message-id", type=int, required=True, | |
help="ID of the message, that we need to lookup") | |
lookup_offset.add_argument( | |
"--read-msg", action="store_true", | |
help="ID of the message, that we need to lookup") | |
lookup_offset.set_defaults(cmd="lookup_offset") | |
return parser | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment