Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Last active August 26, 2022 20:46
Show Gist options
  • Save nsivabalan/445c17352046a6cdee51534933549e48 to your computer and use it in GitHub Desktop.
Save nsivabalan/445c17352046a6cdee51534933549e48 to your computer and use it in GitHub Desktop.
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.SaveMode._
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val inputPath = "/tmp/inputDir/"
val dStream = ssc.textFileStream(inputPath)
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips_cow"
val dataGen = new DataGenerator
dStream.foreachRDD { rdd =>
val batchDf = rdd.toDF()
batchDf.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "tpep_dropoff_datetime").
option(RECORDKEY_FIELD_OPT_KEY, "tpep_pickup_datetime").
option(PARTITIONPATH_FIELD_OPT_KEY, "VendorID").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
}
ssc.start()
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.SaveMode._
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(1))
val inputPath = "/tmp/inputDir/"
val dStream = ssc.textFileStream(inputPath)
val basePath = "file:///tmp/parquet"
dStream.foreachRDD { rdd =>
val batchDf = rdd.toDF()
batchDf.write.format("parquet").
mode(Append).
save(basePath)
}
ssc.start()
val df = spark.read.format("parquet").load("/tmp/parquet/")
df: org.apache.spark.sql.DataFrame = [value: string]
scala> df.printSchema
root
|-- value: string (nullable = true)
scala> df.show(2, false)
+-----+
|value|
+-----+
+-----+
scala> df.count
res2: Long = 0
@guanziyue
Copy link

guanziyue commented Aug 26, 2022

Not sure the status of timeline. It seems that inputstream didn't generate proper data. Any log from hudi client side?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment