Skip to content

Instantly share code, notes, and snippets.

@nfarah86
Created April 19, 2023 05:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nfarah86/7b09755b11830d7530a2be2eae219a4a to your computer and use it in GitHub Desktop.
Save nfarah86/7b09755b11830d7530a2be2eae219a4a to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SaveMode._
// simulate a dimension table for stocks
case class Company(symbol: String, name: String, yearFounded: Int)
val records = Seq(
Company("MSFT", "Microsoft", 1975)
)
records.toDF("symbol", "name", "yearFounded").createOrReplaceTempView("companies")
// join the changed records with the dimension table
incrementalDF.createOrReplaceTempView("changedStocks")
val result = spark.sql("""
|SELECT s.*, c.name, c.yearFounded
|FROM changedStocks s
|JOIN companies c
|ON s.symbol = c.symbol
""".stripMargin)
val wideTablePath = "/tmp/hudi/stocks_wide"
result.write.format("hudi").
option("hoodie.datasource.write.recordkey.field", "symbol").
option("hoodie.datasource.write.partitionpath.field", "date").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.table.name", "stocks_wide").
mode(Append).
save(wideTablePath)
// query the wide table to see all the columns.
spark.read.format("hudi").load(wideTablePath).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment