Skip to content

Instantly share code, notes, and snippets.

@timoguin
Created January 24, 2024 23:01
Show Gist options
  • Save timoguin/39b3d8304fc9ae293e597b19a0a7b2f3 to your computer and use it in GitHub Desktop.
Save timoguin/39b3d8304fc9ae293e597b19a0a7b2f3 to your computer and use it in GitHub Desktop.
Working with Iceberg tables in Athena Notebooks (pySpark)
from pyspark.sql import DataFrame
import boto3
GLUE_CLIENT = boto3.client("glue", region_name="us-east-2")
def get_table_metadata_location(glue_client, table: str) -> str:
"""
Accepts a Glue client instance and a table name in the format of "database_name.table_name"
and returns the S3 path for the table's current metadata file.
"""
database_name, table_name = table.split(".")
table_details = glue_client.get_table(DatabaseName=database_name, Name=table_name)
return table_details["Table"]["Parameters"]["metadata_location"]
def get_table_history(table: str) -> DataFrame:
"""
Return a DataFrame representing the history and snapshot info for a table
"""
df = spark.sql(
f"""
SELECT
h.*,
s.committed_at,
s.operation
FROM {table}.history h
JOIN {table}.snapshots s
ON h.snapshot_id = s.snapshot_id
ORDER BY committed_at DESC
"""
)
return df
def get_incremental_read(
glue_client,
table: str,
start_snapshot_id: str,
end_snapshot_id: str = None,
) -> DataFrame:
"""
Perform an incremental read between two snapshots and return a DataFrame. Needs an instance
of a Glue client to lookup the metadata location.
"""
metadata_location = get_table_metadata_location(glue_client, table)
if end_snapshot_id is None:
df = (
spark.read
.format("iceberg")
.option("start-snapshot-id", start_snapshot_id)
.load(metadata_location)
)
else:
df = (
spark.read
.format("iceberg")
.option("start-snapshot-id", start_snapshot_id)
.option("end-snapshot-id", end_snapshot_id)
.load(metadata_location)
)
return df
def create_changelog_view(
table: str,
view_name: str,
start_snapshot_id: str,
end_snapshot_id: str = None,
) -> str:
"""
Uses the create_changelog_view Spark procedure to create an view from an incremental query
between two snapshots.
Returns the name of the view.
NOTE: The net_changes option is not available for the current built-in version of the Spark Iceberg extensions
"""
# The end-snapshot-id option is optional. Without it, it defaults to the most current snapshot
if end_snapshot_id is None:
options = f"map('start-snapshot-id', '{start_snapshot_id}')"
else:
options = f"map('start-snapshot-id', '{start_snapshot_id}', 'end-snapshot-id': '{end_snapshot_id}')"
res = spark.sql(
f"""
CALL spark_catalog.system.create_changelog_view(
table => '{table}',
changelog_view => '{view_name}',
options => {options}
)
"""
)
return res
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment