Skip to content

Instantly share code, notes, and snippets.

@mattfysh
Created April 4, 2024 00:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattfysh/9177a229ed06de9043bfb963e24c4097 to your computer and use it in GitHub Desktop.
Save mattfysh/9177a229ed06de9043bfb963e24c4097 to your computer and use it in GitHub Desktop.
read delta table as a stream
import os
import json
from typing import Any
SKIP_OPERATIONS = ["OPTIMIZE", "VACUUM START", "VACUUM END"]
def is_subset(a: dict[str, Any], b: dict[str, Any]):
sub = a.items()
full = b.items()
return all(item in full for item in sub)
def file_version(filename: str):
return int(filename.split(".")[0])
def get_incremental_files_for_partition(
partition_values: dict[str, str], table_path: str, cursor: int
):
log_path = f"{table_path}/_delta_log"
commits = sorted(
[file for file in os.listdir(log_path) if file.endswith(".json")]
)
if len(commits) == 0:
return None
start_version = file_version(commits[0])
end_version = file_version(commits[-1])
entry_expected = end_version >= cursor
cursor_index = cursor - start_version
commits = commits[cursor_index:]
if start_version > cursor:
raise Exception(f"Version {cursor} preceeds the earliest commit")
elif entry_expected and len(commits) == 0:
raise Exception(
f"Version {cursor} could not be located at expected index, are there entries missing from the transaction log?"
)
files = []
next_cursor = cursor
for commit in commits:
version = file_version(commit)
commit_filepath = os.path.join(log_path, commit)
with open(commit_filepath, "r") as file:
lines = file.read().splitlines()
commit_info = json.loads(lines[-1])["commitInfo"]
operation = commit_info["operation"]
if operation in SKIP_OPERATIONS:
continue
is_append_only = operation == "CREATE TABLE" or (
operation == "WRITE"
and commit_info["operationParameters"]["mode"] == "Append"
)
actions = lines[:-1]
for line in actions:
(action, spec), *_ = json.loads(line).items()
if action in ["add", "remove"]:
if not is_subset(partition_values, spec["partitionValues"]):
continue
elif action == "add" and is_append_only:
new_file = os.path.join(table_path, spec["path"])
files.append(new_file)
else:
raise Exception(
f"Found action for partition within incompatible commit operation '{operation}'"
)
elif action in ["protocol", "metaData"]:
continue
else:
raise Exception(f"Not implemented: {action}")
next_cursor = version + 1
return files, next_cursor
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment