Skip to content

Instantly share code, notes, and snippets.

@kawata-atsushi
Created March 22, 2024 13:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kawata-atsushi/b00fb9719a72c8263b7bd922407487ad to your computer and use it in GitHub Desktop.
Save kawata-atsushi/b00fb9719a72c8263b7bd922407487ad to your computer and use it in GitHub Desktop.
sample PySpark Script. Migration to Iceberg table using ctas.
from pyspark.sql import SparkSession
catalog_name = "glue_catalog"
bucket_name = "stage-ap-northeast-1-cm-zunda-demo"
warehouse_path = f"s3://{bucket_name}/iceberg"
spark = SparkSession.builder \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config(f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.warehouse", warehouse_path) \
.config(f"spark.sql.catalog.{catalog_name}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config(f"spark.sql.catalog.{catalog_name}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \
.getOrCreate()
spark.sparkContext.setLogLevel('OFF')
# create table as select
query = f"""
create table {catalog_name}.demo.`demo_target_iceberg_ctas`
using iceberg
tblproperties ("format-version"="2")
partitioned by (year,element)
as
select * from demo.`demo_source_parquet`
order by year, element
"""
spark.sql(query)
spark.sql("use demo")
spark.sql("show tables").show(truncate=False)
query_select_files = f"select file_path from {catalog_name}.demo.demo_target_iceberg_ctas.files"
spark.sql(query_select_files).show(10, truncate=False)
query_select_snapshots = f"select snapshot_id, manifest_list from {catalog_name}.demo.demo_target_iceberg_ctas.snapshots"
spark.sql(query_select_snapshots).show(10, truncate=False)
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment