Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Last active March 12, 2024 11:42
Show Gist options
  • Save ad1happy2go/e7a2f8c695fde4c3db060a7113610931 to your computer and use it in GitHub Desktop.
Save ad1happy2go/e7a2f8c695fde4c3db060a7113610931 to your computer and use it in GitHub Desktop.
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
def initialize_spark_session():
"""
Initialize and return a Spark session configured for Hudi.
"""
spark = SparkSession.builder \
.appName("Hudi Incremental Read Example") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.1") \
.getOrCreate()
return spark
def create_initial_dataframe(spark):
"""
Create an initial DataFrame with sample data.
"""
data = [Row(id=1, name="John", timestamp="2021-01-01"),
Row(id=2, name="Jane", timestamp="2021-01-02")]
schema = "id int, name string, timestamp string"
return spark.createDataFrame(data, schema)
def write_dataframe_to_hudi(df, basePath):
"""
Write the DataFrame to a Hudi table.
"""
hudiOptions = {
"hoodie.table.name": "my_hudi_table",
"hoodie.datasource.write.recordkey.field": "id",
"hoodie.datasource.write.partitionpath.field": "timestamp",
"hoodie.datasource.write.precombine.field": "timestamp",
"hoodie.datasource.write.operation": "upsert",
"hoodie.datasource.write.table.type": "COPY_ON_WRITE",
}
df.write.format("hudi") \
.options(**hudiOptions) \
.mode("overwrite") \
.save(basePath)
def perform_incremental_read(spark, basePath):
"""
Perform an incremental read from the Hudi table starting from the given commit time.
"""
tripsIncrementalDF = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "incremental") \
.option("hoodie.datasource.read.begin.instanttime", 0) \
.load(basePath)
return tripsIncrementalDF
def main():
# Initialize Spark Session
spark = initialize_spark_session()
# Define the base path for the Hudi table
basePath = "/tmp/test11"
# Create and write initial DataFrame to Hudi table
df = create_initial_dataframe(spark)
write_dataframe_to_hudi(df, basePath)
# Perform an incremental read from the latest commit time
incremental_df = perform_incremental_read(spark, basePath)
incremental_df.show()
# Stop the Spark session
spark.stop()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment