Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anjijava16/5fcd1d1f63b082bd3d3691ca92b1797f to your computer and use it in GitHub Desktop.
Save anjijava16/5fcd1d1f63b082bd3d3691ca92b1797f to your computer and use it in GitHub Desktop.
package com.mts.matrix.spark.stream
import com.mts.matrix.spark.utils.SparkUtils
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.functions.{col,lit, from_json}
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.streaming.Trigger
object KafkaStreamReaderWriteIntoMySQL {
var brokers = "localhost:9092";
val topics = "iwinner_order_service_lap_1003"
val startingOffsets = "earliest"
var outFilePath = "C:/data/outputcsv";
val checkpointLocation = "C:/data/checkpointcsv";
var queryName = "Input Query Process";
val url="jdbc:mysql://localhost:3306/meetup_db"
val username=""
val password=""
def kafkaReadStream(): Unit = {
val spark = SparkUtils.getSparkSession();
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("group.id", "spark-consumer-iwinner-service-ldap-1003")
.option("startingOffsets", startingOffsets)
.load()
val schema = new StructType().add("orderId", StringType)
.add("orderName", StringType)
.add("orderCustomerNumber", StringType)
.add("orderStatus", StringType)
.add("orderStartDate", StringType)
.add("orderDueDate", StringType)
.add("orderDiscount", StringType)
.add("orderISN", StringType)
.add("orderStoreId", StringType)
.add("orderStaffId", StringType)
.add("totalOrders", IntegerType)
.add("orderPrice", StringType)
.add("processTimestamp", StringType)
import spark.implicits._
val message = readStream.selectExpr("partition", "offset", "CAST(key AS STRING)", "CAST(value AS STRING)").as[KafkaMessage]
val outpputDF = message.selectExpr("CAST(value AS STRING) as data").select(from_json(col("data"), schema).as("data")).select("data.*")
val fileStream: StreamingQuery = outpputDF.writeStream
.format("csv")
.outputMode("append")
.trigger(Trigger.ProcessingTime("10 seconds"))
.option("checkpointLocation", checkpointLocation)
.option("path", outFilePath)
.queryName(queryName)
.start()
fileStream.awaitTermination();
val prop=new java.util.Properties()
prop.put("user","root")
prop.put("password","root")
prop.put("driver", "com.mysql.jdbc.Driver")
outpputDF
.writeStream
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("complete").foreachBatch{
(batchDf:DataFrame,batchId:Long) =>
val df=batchDf.withColumn("batchId",lit(batchId))
df.printSchema()
df.write.mode(SaveMode.Append).jdbc(url,"meetup_rsvp_tbl",prop)
df.show(20,false);
}.start();
}
def main(args: Array[String]): Unit = {
kafkaReadStream();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment