Created
January 9, 2021 21:44
-
-
Save anjijava16/5fcd1d1f63b082bd3d3691ca92b1797f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mts.matrix.spark.stream | |
import com.mts.matrix.spark.utils.SparkUtils | |
import org.apache.spark.sql.{DataFrame, SaveMode} | |
import org.apache.spark.sql.functions.{col,lit, from_json} | |
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} | |
import org.apache.spark.sql.types.{IntegerType, StringType, StructType} | |
import org.apache.spark.sql.streaming.Trigger | |
object KafkaStreamReaderWriteIntoMySQL { | |
var brokers = "localhost:9092"; | |
val topics = "iwinner_order_service_lap_1003" | |
val startingOffsets = "earliest" | |
var outFilePath = "C:/data/outputcsv"; | |
val checkpointLocation = "C:/data/checkpointcsv"; | |
var queryName = "Input Query Process"; | |
val url="jdbc:mysql://localhost:3306/meetup_db" | |
val username="" | |
val password="" | |
def kafkaReadStream(): Unit = { | |
val spark = SparkUtils.getSparkSession(); | |
val readStream = spark.readStream | |
.format("kafka") | |
.option("kafka.bootstrap.servers", brokers) | |
.option("subscribe", topics) | |
.option("group.id", "spark-consumer-iwinner-service-ldap-1003") | |
.option("startingOffsets", startingOffsets) | |
.load() | |
val schema = new StructType().add("orderId", StringType) | |
.add("orderName", StringType) | |
.add("orderCustomerNumber", StringType) | |
.add("orderStatus", StringType) | |
.add("orderStartDate", StringType) | |
.add("orderDueDate", StringType) | |
.add("orderDiscount", StringType) | |
.add("orderISN", StringType) | |
.add("orderStoreId", StringType) | |
.add("orderStaffId", StringType) | |
.add("totalOrders", IntegerType) | |
.add("orderPrice", StringType) | |
.add("processTimestamp", StringType) | |
import spark.implicits._ | |
val message = readStream.selectExpr("partition", "offset", "CAST(key AS STRING)", "CAST(value AS STRING)").as[KafkaMessage] | |
val outpputDF = message.selectExpr("CAST(value AS STRING) as data").select(from_json(col("data"), schema).as("data")).select("data.*") | |
val fileStream: StreamingQuery = outpputDF.writeStream | |
.format("csv") | |
.outputMode("append") | |
.trigger(Trigger.ProcessingTime("10 seconds")) | |
.option("checkpointLocation", checkpointLocation) | |
.option("path", outFilePath) | |
.queryName(queryName) | |
.start() | |
fileStream.awaitTermination(); | |
val prop=new java.util.Properties() | |
prop.put("user","root") | |
prop.put("password","root") | |
prop.put("driver", "com.mysql.jdbc.Driver") | |
outpputDF | |
.writeStream | |
.trigger(Trigger.ProcessingTime("10 seconds")) | |
.outputMode("complete").foreachBatch{ | |
(batchDf:DataFrame,batchId:Long) => | |
val df=batchDf.withColumn("batchId",lit(batchId)) | |
df.printSchema() | |
df.write.mode(SaveMode.Append).jdbc(url,"meetup_rsvp_tbl",prop) | |
df.show(20,false); | |
}.start(); | |
} | |
def main(args: Array[String]): Unit = { | |
kafkaReadStream(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment