Skip to content

Instantly share code, notes, and snippets.

@bvaradar
Last active April 4, 2022 03:17
Show Gist options
  • Save bvaradar/d892c6c6a69664463f8601d09c187271 to your computer and use it in GitHub Desktop.
Save bvaradar/d892c6c6a69664463f8601d09c187271 to your computer and use it in GitHub Desktop.
Structured Streaming Simple Testbed setup using kafka
#Download Confluent platform as zip locally : https://www.confluent.io/download/?_ga=2.149291254.1340520780.1594928883-290224092.1594928883&_gac=1.220398892.1594951593.EAIaIQobChMIm6Cmz5nT6gIVCa_ICh0IeAjlEAAYASAAEgLWkfD_BwE
#Choose zip option. Unzip after download. setup in your home directory.
export CONFLUENT_HOME=<path_to_confluent_home>
export PATH=$PATH:$CONFLUENT_HOME/bin
# Start services
confluent local start
# Start load-gen - Orders table
ksql-datagen value-format=json msgRate=20000 quickstart=orders printRows=false nThreads=20
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.hudi:hudi-spark-bundle_2.11:0.5.3,org.apache.spark:spark-avro_2.11:2.4.4 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
:paste
import spark.implicits._
import scala.concurrent.duration._
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.functions._
import java.util.Calendar
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, functions}
import org.apache.spark.{SparkConf, SparkContext};
sc.setLogLevel("ERROR")
class EventCollector extends StreamingQueryListener {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {
println("Starting batch " + ", Time :" + Calendar.getInstance().getTime())
}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
println("Progress : " + event.progress + ", Time :" + Calendar.getInstance().getTime())
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {
println("Terminating batch " + ", Time :" + Calendar.getInstance().getTime())
}
}
// Read from Kafka - Topick : orders_kafka_topic_json
// Note maxOffsetsPerTrigger gives an ability to reproduce the same load for repeated experiments when starting from fresh
// Use earliest starting offset to ensure we are restarting from same state
val inputDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "orders_kafka_topic_json")
.option("maxOffsetsPerTrigger", 10000)
.option("startingOffsets", "earliest")
.load()
val ordersJsonDf : Dataset[Row] = inputDf.selectExpr("CAST(value AS STRING)")
// Schema for value part of kafka payload. Matches Orders schema
val addr_struct = new StructType().add("city", DataTypes.StringType).add("state", DataTypes.StringType).add("zipcode", DataTypes.IntegerType)
val struct = new StructType().add("ordertime", DataTypes.LongType).add("orderid", DataTypes.IntegerType).add("itemid", DataTypes.StringType).add("orderunits", DataTypes.DoubleType).add("address", addr_struct)
// Add partition path col with a literal value
val outputDF = ordersJsonDf.select(from_json(col("value"), struct).as("orders")).withColumn("partitionpath", functions.lit("default"))
// start writing to Hudi
val query = outputDF.writeStream.format("hudi").option("path", "file:///tmp/orders_stream_hudi/")
.option("checkpointLocation", "file:///tmp/orders_stream_hudi_ckpt/")
//.option("hoodie.parquet.small.file.limit", "0")
.option("hoodie.insert.shuffle.parallelism", "4")
.option("hoodie.upsert.shuffle.parallelism", "4")
.option("hoodie.datasource.write.operation", "upsert")
.option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
.option("hoodie.datasource.write.recordkey.field", "orders.orderid")
.option("hoodie.datasource.write.precombine.field","orders.ordertime")
.option("hoodie.datasource.write.partitionpath.field", "partitionpath")
.option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator")
.option("hoodie.table.name", "hudi_streaming_orders")
.trigger(Trigger.ProcessingTime("60 seconds")).start()
query.awaitTermination()
val inputDf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "orders_kafka_topic_json").load()
val ordersJsonDf = inputDf.selectExpr("CAST(value AS STRING)")
// Extract value as json
val addr_struct = new StructType().add("city", DataTypes.StringType).add("state", DataTypes.StringType).add("zipcode", DataTypes.IntegerType)
// Schema for changing to structTypes
val struct = new StructType().add("ordertime", DataTypes.LongType).add("orderid", DataTypes.IntegerType).add("itemid", DataTypes.StringType).add("orderunits", DataTypes.DoubleType).add("address", addr_struct)
val outputDF = ordersJsonDf.select(from_json($"value", struct).as("orders"))
// start writing to Hudi
outputDF.writeStream.format("hudi").option("path", "file:///tmp/orders_stream_hudi/").option("checkpointLocation", "file:///tmp/orders_stream_hudi_ckpt/").option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2").option("hoodie.datasource.write.table.type", "MERGE_ON_READ").option("hoodie.datasource.write.recordkey.field", "orders.orderid").option("hoodie.datasource.write.precombine.field","orders.ordertime").option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.NonpartitionedKeyGenerator").option("hoodie.table.name", "hudi_streaming_orders").start()
@cb149
Copy link

cb149 commented Jun 9, 2021

I just created an application based on the steps in your code to write data from Kafka to HDFS using Hudi 0.8.0 and Spark 2.4.0.
In general everything seems to work as intended, however I observed issues with the checkpoint, which seems to be written even when there is an error or when query.stop() is called.

E.g. I called query.stop() pretty much right after a new micro-batch started, so effectively there was only 1 message processed and no data written to Hudi, however both the checkpoint files in /offsets and /commits were created, so the next batch would start from the wrong offset and effectively skip messages.

Could this issue be related to using Hudi as a Sink for Spark Structured Streaming or originate from something else?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment