Created
May 10, 2024 17:46
-
-
Save maxmouchet/56560d9abfb8878fca42ad9e4d1754e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import mmap | |
from collections import defaultdict | |
from sys import argv | |
# cb: control byte | |
# cs: control byte size | |
# dt: data type | |
# ps: payload size (or pointer size if ptr) | |
# pv: pointer value | |
DATA_SECTION_SEP = b'\x00' * 16 | |
META_SECTION_SEP = b'\xab\xcd\xefMaxMind.com' | |
TYPES = { | |
1: "pointer", | |
2: "string", | |
3: "double", | |
4: "bytes", | |
5: "uint16", | |
6: "uint32", | |
7: "map", | |
8: "int32", | |
9: "uint64", | |
10: "uint128", | |
11: "array", | |
12: "cache", | |
13: "end", | |
14: "boolean", | |
15: "float" | |
} | |
# https://stackoverflow.com/a/31631711 | |
def humanbytes(B): | |
"""Return the given bytes as a human friendly KB, MB, GB, or TB string.""" | |
B = float(B) | |
KB = float(1024) | |
MB = float(KB ** 2) # 1,048,576 | |
GB = float(KB ** 3) # 1,073,741,824 | |
TB = float(KB ** 4) # 1,099,511,627,776 | |
if B < KB: | |
return '{0} {1}'.format(B,'Bytes' if 0 == B > 1 else 'Byte') | |
elif KB <= B < MB: | |
return '{0:.2f} KB'.format(B / KB) | |
elif MB <= B < GB: | |
return '{0:.2f} MB'.format(B / MB) | |
elif GB <= B < TB: | |
return '{0:.2f} GB'.format(B / GB) | |
elif TB <= B: | |
return '{0:.2f} TB'.format(B / TB) | |
# Decode the control byte | |
def decode_cb(reader): | |
cs = 1 | |
cb = reader.read(1) | |
dt = int.from_bytes(cb) >> 5 | |
if dt == 1: # pointer | |
ps = (int.from_bytes(cb) >> 3) & 0x3 | |
if ps == 0: | |
pv = int.from_bytes(cb + reader.read(1)) & 0x7FF | |
elif ps == 1: | |
pv = 2048 + int.from_bytes(cb + reader.read(2)) & 0x7FFFF | |
elif ps == 2: | |
pv = 526336 + int.from_bytes(cb + reader.read(3)) & 0x7FFFFFF | |
elif ps == 3: | |
pv = int.from_bytes(cb + reader.read(4)) | |
else: | |
raise NotImplementedError | |
return cs, dt, ps, pv | |
ps = int.from_bytes(cb) & 0x1F | |
if dt == 0: # extended type | |
cs += 1 | |
cb = reader.read(1) | |
dt = 7 + int.from_bytes(cb) | |
if ps == 29: | |
cs += 1 | |
ps += int.from_bytes(reader.read(1)) | |
elif ps == 30: | |
cs += 2 | |
ps += int.from_bytes(reader.read(2)) | |
elif ps == 31: | |
cs += 3 | |
ps += int.from_bytes(reader.read(3)) | |
elif ps > 31: | |
raise ValueError | |
return cs, dt, ps, None | |
# Return payload and its size in bytes (as stored in the MMDB file) | |
def decode_payload(reader, dt, ps, pv): | |
match dt: | |
case 1: # Pointer | |
assert pv is not None | |
return pv, ps + 1 | |
case 2: # string | |
return mm.read(ps), ps | |
case 3: # double | |
assert ps == 8 | |
# TODO: Decode double value | |
return mm.read(ps), ps | |
case 4: # bytes | |
return mm.read(ps), ps | |
case 5: # uint16 | |
assert ps <= 2 | |
val = int.from_bytes(mm.read(ps)) | |
return val, ps | |
case 6: # uint32 | |
assert ps <= 4 | |
sizes["uint32"] += ps | |
val = int.from_bytes(mm.read(ps)) | |
return val, ps | |
case 7: # map | |
# We compute the size of the map elements individually | |
return None, 0 | |
case 8: # int32 | |
assert ps <= 4 | |
sizes["int32"] += ps | |
val = int.from_bytes(mm.read(ps), signed=True) | |
return val, ps | |
case 9: # uint64 | |
assert ps <= 8 | |
sizes["uint64"] += ps | |
val = int.from_bytes(mm.read(ps)) | |
return val, ps | |
case 10: # uint128 | |
assert ps <= 16 | |
sizes["uint128"] += ps | |
val = int.from_bytes(mm.read(ps)) | |
return val, ps | |
case 11: # array | |
# Same as map | |
return None, 0 | |
case 12: # cache | |
# Same as map | |
return None, 0 | |
case 13: # end marker | |
assert ps == 0 | |
return None, 0 | |
case 14: # bool | |
assert ps in (0,1) | |
return None, 0 | |
case 15: # float | |
assert ps == 4 | |
# TODO: Decode float value | |
return mm.read(ps), ps | |
if __name__ == "__main__": | |
sizes = defaultdict(int) | |
with open(argv[1], "rb") as f: | |
mm = mmap.mmap(f.fileno(), 0, prot=mmap.PROT_READ) | |
data_section_start = mm.find(DATA_SECTION_SEP) + len(DATA_SECTION_SEP) | |
meta_section_start = mm.find(META_SECTION_SEP) + len(META_SECTION_SEP) | |
binary_tree_size = data_section_start - len(DATA_SECTION_SEP) | |
data_section_size = meta_section_start - data_section_start - len(META_SECTION_SEP) | |
mm.seek(data_section_start) | |
while mm.tell() < meta_section_start - len(META_SECTION_SEP): | |
cs, dt, ps, pv = decode_cb(mm) | |
payload, size = decode_payload(mm, dt, ps, pv) | |
sizes[TYPES[dt]] += size + cs | |
print("Binary tree size:", humanbytes(binary_tree_size)) | |
print("Data section size:", humanbytes(data_section_size)) | |
print("Data section size (recomputed):", humanbytes(sum(sizes.values()))) | |
print("\n=== Data section breakdown ===\n") | |
for dt, size in sorted(sizes.items(), key=lambda x: x[1], reverse=True): | |
print(f"{dt}:", humanbytes(size)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment