-
-
Save kmdupr33/27c1232cbb7e580f44098b33f27a620a to your computer and use it in GitHub Desktop.
yuck
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
def read_varint(data, offset): | |
"""Reads a variable-length integer from the data starting at the given offset.""" | |
varint = 0 | |
for i in range(9): | |
if offset >= len(data): | |
raise Exception("Offset out of range while reading varint.") | |
byte = data[offset] | |
offset += 1 | |
if byte & 0x80: | |
varint = (varint << 7) | (byte & 0x7F) | |
else: | |
varint = (varint << 7) | byte | |
break | |
return varint, offset | |
def parse_page(data, page_size, page_number, header_size=100): | |
"""Parses a page and returns the entries found within it.""" | |
entries = [] | |
# Calculate the offset of the page in the data | |
if page_number == 1: | |
page_offset = header_size # Skip the 100-byte database header | |
else: | |
page_offset = header_size + (page_number - 1) * page_size | |
if page_offset + page_size > len(data): | |
print(f"Page number {page_number} is out of range.") | |
return entries | |
page = data[page_offset:page_offset + page_size] | |
page_type = page[0] | |
cell_count = struct.unpack('>H', page[3:5])[0] | |
if page_type == 5: # Internal table B-tree page | |
# Read the right-most pointer | |
right_most_pointer = struct.unpack('>I', b'\x00' + page[8:11])[0] | |
# Read cell pointers | |
cell_pointers = [struct.unpack('>H', page[12 + 2*i:14 + 2*i])[0] for i in range(cell_count)] | |
child_page_numbers = [] | |
for ptr in cell_pointers: | |
cell_offset = ptr | |
# Read left child pointer | |
left_child_page = struct.unpack('>I', b'\x00' + page[cell_offset:cell_offset+3])[0] | |
child_page_numbers.append(left_child_page) | |
# Skip the key (rowid) | |
# For sqlite_master table, we can ignore the rowid in internal pages | |
# Recursively parse child pages | |
for child_page in child_page_numbers: | |
entries.extend(parse_page(data, page_size, left_child_page, header_size)) | |
# Don't forget the right-most child page | |
entries.extend(parse_page(data, page_size, right_most_pointer, header_size)) | |
elif page_type == 13: # Leaf table B-tree page | |
# Read cell pointers | |
cell_pointers = [struct.unpack('>H', page[8 + 2*i:10 + 2*i])[0] for i in range(cell_count)] | |
for ptr in cell_pointers: | |
cell_offset = ptr | |
payload_size, offset = read_varint(page, cell_offset) | |
cell_offset = offset | |
rowid, offset = read_varint(page, cell_offset) | |
cell_offset = offset | |
# Read the Record Header | |
header_size_varint, offset = read_varint(page, cell_offset) | |
cell_offset = offset | |
header_start = cell_offset | |
types = [] | |
while cell_offset < header_start + header_size_varint: | |
serial_type, offset = read_varint(page, cell_offset) | |
types.append(serial_type) | |
cell_offset = offset | |
# Read the Record Content | |
fields = [] | |
for serial_type in types: | |
if serial_type == 0: | |
value = None | |
elif serial_type == 1: | |
value = struct.unpack('>b', page[cell_offset:cell_offset+1])[0] | |
cell_offset += 1 | |
elif serial_type == 2: | |
value = struct.unpack('>h', page[cell_offset:cell_offset+2])[0] | |
cell_offset += 2 | |
elif serial_type == 3: | |
value = struct.unpack('>i', b'\x00'+page[cell_offset:cell_offset+3])[0] | |
cell_offset += 3 | |
elif serial_type == 4: | |
value = struct.unpack('>i', page[cell_offset:cell_offset+4])[0] | |
cell_offset += 4 | |
elif serial_type == 5: | |
value = struct.unpack('>q', b'\x00\x00\x00'+page[cell_offset:cell_offset+5])[0] | |
cell_offset += 6 # Should be 6 bytes total | |
elif serial_type == 6: | |
value = struct.unpack('>q', page[cell_offset:cell_offset+8])[0] | |
cell_offset += 8 | |
elif serial_type == 7: | |
value = struct.unpack('>d', page[cell_offset:cell_offset+8])[0] | |
cell_offset += 8 | |
elif serial_type == 8: | |
value = 0 | |
elif serial_type == 9: | |
value = 1 | |
elif serial_type >= 12 and serial_type % 2 == 0: | |
length = (serial_type - 12) // 2 | |
value = page[cell_offset:cell_offset+length].decode('utf-8', errors='replace') | |
cell_offset += length | |
elif serial_type >= 13 and serial_type % 2 == 1: | |
length = (serial_type - 13) // 2 | |
value = page[cell_offset:cell_offset+length] | |
cell_offset += length | |
else: | |
value = None # Unsupported serial type | |
fields.append(value) | |
entries.append(fields) | |
else: | |
print(f"Unsupported page type: {page_type}") | |
return entries | |
def parse_sqlite_master(database_path): | |
with open(database_path, 'rb') as f: | |
data = f.read() | |
# Check SQLite file header | |
if data[:16] != b'SQLite format 3\x00': | |
print("Not a valid SQLite database file.") | |
return | |
header_size = 100 # The SQLite database header is 100 bytes | |
page_size = struct.unpack('>H', data[16:18])[0] | |
if page_size == 1: | |
page_size = 65536 | |
# Start parsing from the first page (page number 1) | |
entries = parse_page(data, page_size, 1, header_size) | |
if not entries: | |
print("No entries found in sqlite_master.") | |
return | |
# sqlite_master table columns: type, name, tbl_name, rootpage, sql | |
print("type | name | tbl_name | rootpage | sql") | |
for entry in entries: | |
print(" | ".join(str(field) for field in entry)) | |
# Example usage | |
database_path = 'your_database_file.db' # Replace with your SQLite database file path | |
parse_sqlite_master(database_path) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment