Skip to content

Instantly share code, notes, and snippets.

@tvoinarovskyi
Last active March 25, 2018 14:00
Show Gist options
  • Save tvoinarovskyi/5b0d8b7c26f72ab3f270cae55c51678c to your computer and use it in GitHub Desktop.
Save tvoinarovskyi/5b0d8b7c26f72ab3f270cae55c51678c to your computer and use it in GitHub Desktop.
Command line tools for some strange X file format. Reference code for my slides at UAPyCon2018 "Binary data in Python with a bit of C spice on top​"
# This file contains a command-line utility for working with X file format
# X file contains sequential messages in the following format:
#
# MessageID => Int64
# MessageLength => Int32
# MessageCRC => UInt32
# MessageFormatVersion => Int8
# MessageBody => Bytes
#
# We assume, that message ids are in incremental order
#
# Index format will be stored as:
#
# ArraySize => Int32
# Array[ArraySize] =>
# MessageID => Int64
# MessageOffset => UInt64
#
import argparse
import io
import pathlib
import struct
import sys
import array
import bisect
import random
from binascii import crc32
MAX_CHUNK_SIZE = 1024 * 1024
HEADER = struct.Struct(">qiIb")
class CorruptedFile(RuntimeError):
pass
def process_file(f, out):
print("Building index for X file: {} to {}".format(
f.resolve(), out.resolve()))
header_size = HEADER.size
header_buf = bytearray(HEADER.size)
result_index = array.array("Q")
# Iterate over the inbound file and build an index of message_id to file
# offset
with f.open("rb", buffering=0) as file:
reader = io.BufferedReader(file, buffer_size=MAX_CHUNK_SIZE)
while True:
# We can read small amounts as the reader is a BufferedReader
readlen = reader.readinto(header_buf)
if readlen == 0: # EOF case
break
if readlen != header_size: # EOF without a full header
raise CorruptedFile()
header = HEADER.unpack(header_buf)
message_id, message_len = header[:2]
result_index.append(message_id)
result_index.append(reader.tell() - header_size)
reader.seek(message_len, io.SEEK_CUR)
with out.open("wb") as file:
file.write(struct.pack(">i", len(result_index) // 2))
file.write(result_index)
print("Successfully written {} size index".format(len(result_index) // 2))
def lookup_offset(index_file, message_id):
item_size = struct.calcsize(">qQ")
with index_file.open("rb", buffering=MAX_CHUNK_SIZE) as reader:
array_size = reader.read(struct.calcsize(">i"))
array_size = struct.unpack(">i", array_size)[0]
lookup_index = reader.read(array_size * item_size)
if len(lookup_index) != array_size * item_size:
raise CorruptedFile()
lookup_table = \
memoryview(lookup_index).cast("Q", (array_size, 2)).tolist()
index = bisect.bisect(lookup_table, [message_id, 0])
if lookup_table[index][0] == message_id:
print("Found message ID={}. Offset {}".format(
message_id, lookup_table[index][1]))
else:
print("Could not find message ID={}".format(message_id))
def lookup_message(f, index_file, message_id, read_msg):
item_size = struct.calcsize(">qQ")
offset = None
with index_file.open("rb", buffering=MAX_CHUNK_SIZE) as reader:
array_size = reader.read(struct.calcsize(">i"))
array_size = struct.unpack(">i", array_size)[0]
lookup_index = reader.read(array_size * item_size)
if len(lookup_index) != array_size * item_size:
raise CorruptedFile()
lookup_table = \
memoryview(lookup_index).cast("Q", (array_size, 2)).tolist()
index = bisect.bisect(lookup_table, [message_id, 0])
if lookup_table[index][0] == message_id:
offset = lookup_table[index][1]
print("Found message ID={}. Offset {}".format(message_id, offset))
else:
print("Could not find message ID={}".format(message_id))
if read_msg and offset is not None:
with f.open("rb", buffering=MAX_CHUNK_SIZE) as reader:
reader.seek(offset)
header = HEADER.unpack(reader.read(HEADER.size))
print("Message header: id={0}, length={1}, crc={2}, format={3}"
.format(*header))
body = reader.read(header[1])
assert header[2] == crc32(body), "CRC does not match..."
print("Message body: \n{}".format(body))
def build_test(f, message_n=1_000_000, max_message_size=1024):
print("Building test data for X file: {}".format(f.resolve()))
with f.open("wb") as file:
format_version = 1
for message_id in range(message_n):
message_size = random.randint(10, max_message_size)
struct_fmt = ">qiIb{:d}s".format(message_size)
message = b"X" * message_size
message_crc = crc32(message)
packed = struct.pack(
struct_fmt,
message_id, message_size, message_crc,
format_version, message)
file.write(packed)
def main():
parser = get_parser()
args = parser.parse_args()
if args.cmd == "build_index":
process_file(args.input, args.output)
elif args.cmd == "build_test":
build_test(args.input)
elif args.cmd == "lookup_offset":
lookup_message(
args.input, args.index_file, args.message_id, args.read_msg)
else:
print(repr(args.input))
def get_parser():
parser = argparse.ArgumentParser(description='X filetype processor')
parser.add_argument(
"-i", "--input", type=pathlib.Path,
help="File path containing the X file format file")
parser.set_defaults(cmd=None)
subparsers = parser.add_subparsers(help='sub-command help')
# Re-Build file index
build_index = subparsers.add_parser(
"build-index", help="Iterate over all messages and build msg_id "
"to file offset mapping")
build_index.set_defaults(cmd="build_index")
build_index.add_argument(
"-o", "--output", type=pathlib.Path,
help="File path where to write the resulting index")
# Build a test file with a lot of data
build_test = subparsers.add_parser(
"build-test", help="Create a testing file in X format")
build_test.set_defaults(cmd="build_test")
# Lookup the message in X file
lookup_offset = subparsers.add_parser(
"lookup-offset", help="Get the offset in X file of message with id")
lookup_offset.add_argument(
"--index-file", type=pathlib.Path, required=True,
help="File path from which to read the index file")
lookup_offset.add_argument(
"--message-id", type=int, required=True,
help="ID of the message, that we need to lookup")
lookup_offset.add_argument(
"--read-msg", action="store_true",
help="ID of the message, that we need to lookup")
lookup_offset.set_defaults(cmd="lookup_offset")
return parser
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment