Skip to content

Instantly share code, notes, and snippets.

@nsivabalan
Created August 24, 2022 18:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nsivabalan/516770bae1b2533e53182176c0ac8462 to your computer and use it in GitHub Desktop.
Save nsivabalan/516770bae1b2533e53182176c0ac8462 to your computer and use it in GitHub Desktop.
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --driver-memory 8g --executor-memory 9g --jars ~/Documents/personal/projects/nov26/hudi/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.11.0-SNAPSHOT.jar
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, from_json,to_json,struct}
import org.apache.spark.sql.avro.to_avro
import org.apache.spark.sql.types.{IntegerType, StringType, LongType, StructType}
import java.time.LocalDateTime
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.ProcessingTime;
// Define kafka flow
val dataStreamReader = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("subscribe", "impressions").
option("startingOffsets", "earliest").
option("maxOffsetsPerTrigger", 5000).
option("failOnDataLoss", false)
val schema = new StructType().
add("impresssiontime",LongType).
add("impressionid",StringType).
add("userid",StringType).
add("adid",StringType)
val df = dataStreamReader.load().
selectExpr(
"topic as kafka_topic",
"CAST(partition AS STRING) kafka_partition",
"cast(timestamp as String) kafka_timestamp",
"CAST(offset AS STRING) kafka_offset",
"CAST(key AS STRING) kafka_key",
"CAST(value AS STRING) kafka_value",
"current_timestamp() current_time").
selectExpr(
"kafka_topic",
"concat(kafka_partition,'-',kafka_offset) kafka_partition_offset",
"kafka_offset",
"kafka_timestamp",
"kafka_key",
"kafka_value",
"substr(current_time,1,10) partition_date").
select(col("kafka_topic"),col("kafka_partition_offset"),col("kafka_offset"),col("kafka_timestamp"),col("kafka_key"),col("kafka_value"),from_json(col("kafka_value"), schema).as("data"),col("partition_date")).select("kafka_topic","kafka_partition_offset","kafka_offset","kafka_timestamp","kafka_key","kafka_value","data.impresssiontime","data.impressionid", "data.userid","data.adid","partition_date")
val writer = df.
writeStream.format("org.apache.hudi").
option(TABLE_TYPE.key, "COPY_ON_WRITE").
option(PRECOMBINE_FIELD.key, "impresssiontime").
option(RECORDKEY_FIELD.key, "adid").
option(PARTITIONPATH_FIELD.key, "userid").
option(HIVE_SYNC_ENABLED.key, false).
option(HIVE_STYLE_PARTITIONING.key, true).
option(FAIL_ON_TIMELINE_ARCHIVING_ENABLE.key, false).
option(STREAMING_IGNORE_FAILED_BATCH.key, false).
option(STREAMING_RETRY_CNT.key, 0).
option("hoodie.table.name", "copy_on_write_table").
option("hoodie.cleaner.commits.retained","6").
option("hoodie.keep.min.commits","10").
option("hoodie.keep.max.commits","15").
option("hoodie.clustering.plan.strategy.target.file.max.bytes","10485760").
option("hoodie.clustering.plan.strategy.small.file.limit","200000000").
option("hoodie.clustering.plan.strategy.sort.columns","kafka_partition_offset").
option("hoodie.clustering.async.max.commits","2").
option("hoodie.datasource.clustering.async.enable","true").
option("hoodie.clustering.async.enabled","true").
option("hoodie.clustering.updates.strategy","org.apache.hudi.client.clustering.update.strategy.SparkAllowUpdateStrategy").
option("checkpointLocation", "/tmp/hudi_streaming_kafka/checkpoint/").
outputMode(OutputMode.Append());
writer.trigger(new ProcessingTime(30000)).start("/tmp/hudi_streaming_kafka/COPY_ON_WRITE");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment