Skip to content

Instantly share code, notes, and snippets.

@t04glovern
Last active July 11, 2024 12:41
Show Gist options
  • Save t04glovern/aec9e2391aa94eb017f1bc265feeac7c to your computer and use it in GitHub Desktop.
Save t04glovern/aec9e2391aa94eb017f1bc265feeac7c to your computer and use it in GitHub Desktop.
This script reads and processes Puffin files, extracting and printing blob metadata. It supports reading metadata from the file footer and decompressing the data using the specified compression codec.
#!/usr/bin/env python3
"""
This script reads and processes Puffin files, extracting and printing blob metadata.
It supports reading metadata from the file footer and decompressing the
data using the specified compression codec.
Spec:
https://iceberg.apache.org/puffin-spec/
Install:
python3 -m venv .venv
source .venv/bin/activate
pip3 install zstandard
curl https://gist.github.com/t04glovern/aec9e2391aa94eb017f1bc265feeac7c/raw \
> iceberg-puffin-reader.py \
&& chmod +x iceberg-puffin-reader.py
Usage:
./iceberg-puffin-reader.py --file <file-path> [--pretty]
"""
import argparse
import io
import logging
import json
import struct
from enum import Enum
from typing import List, Tuple, Dict, Optional, Any
import zstandard as zstd
import pprint
logging.basicConfig(level=logging.INFO)
class PuffinCompressionCodec(Enum):
NONE = None
LZ4 = "lz4"
ZSTD = "zstd"
@staticmethod
def for_name(name: Optional[str]) -> 'PuffinCompressionCodec':
for codec in PuffinCompressionCodec:
if codec.value == name:
return codec
raise ValueError(f"Unknown codec name: {name}")
class Flag(Enum):
FOOTER_PAYLOAD_COMPRESSED = (0, 0)
@staticmethod
def from_bit(byte_number: int, bit_number: int) -> Optional['Flag']:
for flag in Flag:
if flag.value == (byte_number, bit_number):
return flag
return None
class PuffinFormat:
FOOTER_START_MAGIC_OFFSET = 0
FOOTER_START_MAGIC_LENGTH = 4
FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET = 0
FOOTER_STRUCT_FLAGS_OFFSET = 4
FOOTER_STRUCT_FLAGS_LENGTH = 4
FOOTER_STRUCT_MAGIC_OFFSET = 8
FOOTER_STRUCT_LENGTH = 12
FOOTER_COMPRESSION_CODEC = PuffinCompressionCodec.LZ4
@staticmethod
def get_magic() -> bytes:
return bytes([0x50, 0x46, 0x41, 0x31])
@staticmethod
def read_integer_little_endian(data: bytes, offset: int) -> int:
return struct.unpack_from('<I', data, offset)[0]
@staticmethod
def compress(codec: PuffinCompressionCodec, input_data: bytes) -> bytes:
if codec == PuffinCompressionCodec.NONE:
return input_data
elif codec == PuffinCompressionCodec.ZSTD:
return zstd.compress(input_data)
else:
raise NotImplementedError(f"Unsupported codec: {codec}")
@staticmethod
def decompress(codec: PuffinCompressionCodec, input_data: bytes) -> bytes:
if codec == PuffinCompressionCodec.NONE:
return input_data
elif codec == PuffinCompressionCodec.ZSTD:
return zstd.decompress(input_data)
else:
raise NotImplementedError(f"Unsupported codec: {codec}")
class PuffinReader:
class BlobMetadata:
def __init__(self, type: str, input_fields: List[str], snapshot_id: int, sequence_number: int,
offset: int, length: int, compression_codec: Optional[str], properties: Dict[str, Any]):
self.type = type
self.input_fields = input_fields
self.snapshot_id = snapshot_id
self.sequence_number = sequence_number
self.offset = offset
self.length = length
self.compression_codec = compression_codec
self.properties = properties
def __init__(self, input_file: bytes, file_size: Optional[int] = None, footer_size: Optional[int] = None):
self.file_size = file_size if file_size is not None else len(input_file)
self.input = io.BytesIO(input_file)
self.known_footer_size = footer_size
self.known_file_metadata: Optional[Dict[str, Any]] = None
if footer_size is not None:
if not (0 < footer_size <= self.file_size - len(PuffinFormat.get_magic())):
raise ValueError(f"Invalid footer size: {footer_size}")
def file_metadata(self) -> Dict[str, Any]:
if self.known_file_metadata is None:
footer_size = self.footer_size()
self.input.seek(self.file_size - footer_size)
footer = self.input.read(footer_size)
self.check_magic(footer, PuffinFormat.FOOTER_START_MAGIC_OFFSET)
footer_struct_offset = footer_size - PuffinFormat.FOOTER_STRUCT_LENGTH
self.check_magic(footer, footer_struct_offset + PuffinFormat.FOOTER_STRUCT_MAGIC_OFFSET)
footer_compression = PuffinCompressionCodec.NONE
for flag in self.decode_flags(footer, footer_struct_offset):
if flag == Flag.FOOTER_PAYLOAD_COMPRESSED:
footer_compression = PuffinFormat.FOOTER_COMPRESSION_CODEC
footer_payload_size = PuffinFormat.read_integer_little_endian(
footer, footer_struct_offset + PuffinFormat.FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET)
if footer_size != (PuffinFormat.FOOTER_START_MAGIC_LENGTH + footer_payload_size + PuffinFormat.FOOTER_STRUCT_LENGTH):
raise ValueError(f"Unexpected footer payload size value {footer_payload_size} for footer size {footer_size}")
footer_payload = footer[4:4 + footer_payload_size]
footer_json = PuffinFormat.decompress(footer_compression, footer_payload)
self.known_file_metadata = self.parse_file_metadata(footer_json)
return self.known_file_metadata
def decode_flags(self, footer: bytes, footer_struct_offset: int) -> set:
flags = set()
for byte_number in range(PuffinFormat.FOOTER_STRUCT_FLAGS_LENGTH):
flag_byte = footer[footer_struct_offset + PuffinFormat.FOOTER_STRUCT_FLAGS_OFFSET + byte_number]
bit_number = 0
while flag_byte != 0:
if flag_byte & 0x1:
flag = Flag.from_bit(byte_number, bit_number)
if flag is None:
raise ValueError(f"Unknown flag byte {byte_number} and bit {bit_number} set")
flags.add(flag)
flag_byte >>= 1
bit_number += 1
return flags
def read_all(self, blobs: List['PuffinReader.BlobMetadata']) -> List[Tuple['PuffinReader.BlobMetadata', bytes]]:
if not blobs:
return []
results = []
blobs.sort(key=lambda b: b.offset)
for blob_metadata in blobs:
self.input.seek(blob_metadata.offset)
bytes_data = self.input.read(blob_metadata.length)
raw_data = bytes(bytes_data) # Convert to bytes
codec = PuffinCompressionCodec.for_name(blob_metadata.compression_codec)
data = PuffinFormat.decompress(codec, raw_data)
results.append((blob_metadata, data))
return results
def check_magic(self, data: bytes, offset: int):
magic = PuffinFormat.get_magic()
read_magic = data[offset:offset + len(magic)]
if read_magic != magic:
raise ValueError(f"Invalid file: expected magic at offset {offset}: {magic}, but got {read_magic}")
def footer_size(self) -> int:
if self.known_footer_size is None:
if self.file_size < PuffinFormat.FOOTER_STRUCT_LENGTH:
raise ValueError(f"Invalid file: file length {self.file_size} is less than minimal length of the footer tail {PuffinFormat.FOOTER_STRUCT_LENGTH}")
self.input.seek(self.file_size - PuffinFormat.FOOTER_STRUCT_LENGTH)
footer_struct = self.input.read(PuffinFormat.FOOTER_STRUCT_LENGTH)
self.check_magic(footer_struct, PuffinFormat.FOOTER_STRUCT_MAGIC_OFFSET)
footer_payload_size = PuffinFormat.read_integer_little_endian(footer_struct, PuffinFormat.FOOTER_STRUCT_PAYLOAD_SIZE_OFFSET)
self.known_footer_size = PuffinFormat.FOOTER_START_MAGIC_LENGTH + footer_payload_size + PuffinFormat.FOOTER_STRUCT_LENGTH
return self.known_footer_size
def parse_file_metadata(self, data: bytes) -> Dict[str, Any]:
footer_json = data.decode('utf-8')
return json.loads(footer_json) # Adjust this to return actual FileMetadata object if needed
def get_blobs(self) -> List['PuffinReader.BlobMetadata']:
metadata = self.file_metadata()
blobs = []
for blob_info in metadata['blobs']:
blob = PuffinReader.BlobMetadata(
type=blob_info['type'],
input_fields=blob_info['fields'],
snapshot_id=blob_info['snapshot-id'],
sequence_number=blob_info['sequence-number'],
offset=blob_info['offset'],
length=blob_info['length'],
compression_codec=blob_info['compression-codec'],
properties=blob_info['properties']
)
blobs.append(blob)
return blobs
def get_created_by(self) -> Optional[str]:
metadata = self.file_metadata()
return metadata.get('properties', {}).get('created-by', None)
def close(self):
self.input.close()
self.known_footer_size = None
self.known_file_metadata = None
def main():
parser = argparse.ArgumentParser(description="Puffin File Reader")
parser.add_argument(
"--file",
type=str,
help="The path to the Puffin file to read.",
required=True,
)
parser.add_argument(
"--pretty",
action="store_true",
help="Pretty print the blob metadata.",
)
args = parser.parse_args()
file_name = args.file
with open(file_name, 'rb') as f:
input_file_content = f.read()
reader = PuffinReader(input_file_content)
blobs = reader.get_blobs()
created_by = reader.get_created_by()
if created_by:
logging.info(f"Created by: {created_by}")
blob_data = reader.read_all(blobs)
for blob, data in blob_data:
if args.pretty:
pprint.pprint(blob.__dict__)
else:
logging.info(f"Blob: {blob.__dict__}")
logging.debug(f"Data: {data}")
reader.close()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment