Skip to content

Instantly share code, notes, and snippets.

@kawata-atsushi
Created March 22, 2024 09:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kawata-atsushi/56507fe83bd43d0d3fa4c828415bd202 to your computer and use it in GitHub Desktop.
Save kawata-atsushi/56507fe83bd43d0d3fa4c828415bd202 to your computer and use it in GitHub Desktop.
sample PySpark Script. Migration to Iceberg table using add_files procedure.
from pyspark.sql import SparkSession
catalog_name = "glue_catalog"
bucket_name = "stage-ap-northeast-1-cm-zunda-demo"
warehouse_path = f"s3://{bucket_name}/iceberg"
spark = SparkSession.builder \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.getOrCreate()
spark.sparkContext.setLogLevel('OFF')
# Create an empty Iceberg table
query = f"""
create table if not exists {catalog_name}.demo.`demo_target_iceberg_add_files`
using iceberg
tblproperties ("format-version"="2")
partitioned by (year, element)
as
select * from demo.`demo_source_parquet` limit 0
"""
spark.sql(query)
spark.sql("use demo")
spark.sql("show tables").show(truncate=False)
query_select_files = f"select file_path from {catalog_name}.demo.demo_target_iceberg_add_files.files"
spark.sql(query_select_files).show(10, truncate=False)
query_select_snapshots = f"select snapshot_id, manifest_list from {catalog_name}.demo.demo_target_iceberg_add_files.snapshots"
spark.sql(query_select_snapshots).show(10, truncate=False)
# Run the add_files procedure
query = f"""
call {catalog_name}.system.add_files(
table => 'demo.demo_target_iceberg_add_files',
source_table => 'demo.demo_source_parquet'
)
"""
spark.sql(query).show(truncate=False)
spark.sql(query_select_files).show(10, truncate=False)
spark.sql(query_select_snapshots).show(10, truncate=False)
# add one record
query = f"""
insert into {catalog_name}.demo.demo_target_iceberg_add_files
select * from demo.demo_source_parquet LIMIT 1
"""
spark.sql(query)
spark.sql(query_select_files).show(10, truncate=False)
spark.sql(query_select_snapshots).show(10, truncate=False)
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment