Skip to content

Instantly share code, notes, and snippets.

View nfarah86's full-sized avatar

nadine farah nfarah86

View GitHub Profile
import org.apache.spark.sql.catalyst.expressions.{Add, If, Literal}
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, Dataset, Row}
import org.apache.spark.sql.SaveMode._
// initialize `employee_country` table with CDC enabled
val employeeCountryTablePath = "/tmp/hudi/employee_country"
import org.apache.spark.sql.SaveMode._
// simulate a dimension table for stocks
case class Company(symbol: String, name: String, yearFounded: Int)
val records = Seq(
Company("MSFT", "Microsoft", 1975)
)
records.toDF("symbol", "name", "yearFounded").createOrReplaceTempView("companies")
// join the changed records with the dimension table
val basePath = "/tmp/hudi/stocks"
spark.read.format("hudi").load(basePath).createOrReplaceTempView("hudi_stocks")
val firstCommit = spark.
sql("select distinct(_hoodie_commit_time) as commitTime from hudi_stocks order by commitTime").
map(k => k.getString(0)).take(1)(0)
val incrementalDF = spark.read.format("hudi").
option("hoodie.datasource.query.type", "incremental").
ls -l /tmp/hudi/stocks/.hoodie/
import org.apache.spark.sql.SaveMode._
val basePath = "/tmp/hudi/stocks"
val stocksDF1 = spark.read.json("docker/demo/data/batch_1.json")
val stocksDF2 = spark.read.option("multiline", "true").json("docker/demo/data/batch_2.json").limit(1)
stocksDF1.write.format("hudi").
option("hoodie.datasource.write.recordkey.field", "symbol").
option("hoodie.datasource.write.partitionpath.field", "date").
option("hoodie.datasource.write.precombine.field", "ts").
spark-shell \
--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.13.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
git clone git@github.com:apache/hudi.git
cd <path>/hudi 
savepoint rollback --savepoint [SavepointTime] --sparkMaster local[2]
ls -g .hoodie
cd ToYourHudiFolderOrSampleDataset
ls -g .hoodie