Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Created March 6, 2024 13:02
Show Gist options
  • Save ad1happy2go/17b32db63f68b49813c8430967a99ec8 to your computer and use it in GitHub Desktop.
Save ad1happy2go/17b32db63f68b49813c8430967a99ec8 to your computer and use it in GitHub Desktop.
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.DataFrame
// Function to set up Hudi options
def setupHudiOptions(tableName: String, basePath: String) = {
Map(
TABLE_NAME -> tableName,
"hoodie.datasource.write.table.type" -> "COPY_ON_WRITE",
"hoodie.datasource.write.drop.partition.columns" -> "true",
"hoodie.partition.metafile.use.base.format" -> "true",
"hoodie.datasource.write.hive_style_partitioning" -> "true",
"hoodie.metadata.enable" -> "true",
"hoodie.meta.sync.client.tool.class" -> "org.apache.hudi.gcp.bigquery.BigQuerySyncTool",
"hoodie.gcp.bigquery.sync.project_id" -> "<>",
"hoodie.gcp.bigquery.sync.dataset_name" -> "<>",
"hoodie.gcp.bigquery.sync.dataset_location" -> "us-central-1",
"hoodie.gcp.bigquery.sync.source_uri" -> s"${basePath}/partitionPath=*",
"hoodie.gcp.bigquery.sync.source_uri_prefix" -> s"$basePath/",
"hoodie.gcp.bigquery.sync.base_path" -> basePath,
"hoodie.datasource.meta.sync.enable" -> "true"
)
}
// Function to write DataFrame to Hudi
def writeToHudi(df: DataFrame, tableName: String, basePath: String, mode:String = "append") = {
df.write.format("hudi")
.options(setupHudiOptions(tableName, basePath))
.option(PRECOMBINE_FIELD_OPT_KEY, "ts")
.option(RECORDKEY_FIELD_OPT_KEY, "uuid")
.option(PARTITIONPATH_FIELD_OPT_KEY, "partitionPath")
.mode(mode)
.save(basePath)
}
val tableName = "issue_bqsync_10829"
val basePath = "gs://hudi-cli-test/community/github/" + tableName
val dataGen = new DataGenerator
val inserts = convertToStringList(dataGen.generateInserts(200))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
// Write DataFrame to Hudi
writeToHudi(df.withColumn("partitionPath", lit("abc")), tableName, basePath, "overwrite")
// Add a new column and write DataFrame to Hudi again
writeToHudi(df.withColumn("partitionPath", lit("abc")).withColumn("newColumn", lit(1)), tableName, basePath, "append")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment