Last active
March 12, 2024 11:42
-
-
Save ad1happy2go/e7a2f8c695fde4c3db060a7113610931 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession, Row | |
from pyspark.sql.functions import col | |
def initialize_spark_session(): | |
""" | |
Initialize and return a Spark session configured for Hudi. | |
""" | |
spark = SparkSession.builder \ | |
.appName("Hudi Incremental Read Example") \ | |
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ | |
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \ | |
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.14.1") \ | |
.getOrCreate() | |
return spark | |
def create_initial_dataframe(spark): | |
""" | |
Create an initial DataFrame with sample data. | |
""" | |
data = [Row(id=1, name="John", timestamp="2021-01-01"), | |
Row(id=2, name="Jane", timestamp="2021-01-02")] | |
schema = "id int, name string, timestamp string" | |
return spark.createDataFrame(data, schema) | |
def write_dataframe_to_hudi(df, basePath): | |
""" | |
Write the DataFrame to a Hudi table. | |
""" | |
hudiOptions = { | |
"hoodie.table.name": "my_hudi_table", | |
"hoodie.datasource.write.recordkey.field": "id", | |
"hoodie.datasource.write.partitionpath.field": "timestamp", | |
"hoodie.datasource.write.precombine.field": "timestamp", | |
"hoodie.datasource.write.operation": "upsert", | |
"hoodie.datasource.write.table.type": "COPY_ON_WRITE", | |
} | |
df.write.format("hudi") \ | |
.options(**hudiOptions) \ | |
.mode("overwrite") \ | |
.save(basePath) | |
def perform_incremental_read(spark, basePath): | |
""" | |
Perform an incremental read from the Hudi table starting from the given commit time. | |
""" | |
tripsIncrementalDF = spark.read.format("hudi") \ | |
.option("hoodie.datasource.query.type", "incremental") \ | |
.option("hoodie.datasource.read.begin.instanttime", 0) \ | |
.load(basePath) | |
return tripsIncrementalDF | |
def main(): | |
# Initialize Spark Session | |
spark = initialize_spark_session() | |
# Define the base path for the Hudi table | |
basePath = "/tmp/test11" | |
# Create and write initial DataFrame to Hudi table | |
df = create_initial_dataframe(spark) | |
write_dataframe_to_hudi(df, basePath) | |
# Perform an incremental read from the latest commit time | |
incremental_df = perform_incremental_read(spark, basePath) | |
incremental_df.show() | |
# Stop the Spark session | |
spark.stop() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment