Skip to content

Instantly share code, notes, and snippets.

@Kiollpt
Last active November 5, 2020 09:57
Show Gist options
  • Save Kiollpt/299552accd5d07cc314f78c1d1d80cd5 to your computer and use it in GitHub Desktop.
Save Kiollpt/299552accd5d07cc314f78c1d1d80cd5 to your computer and use it in GitHub Desktop.
#spark
def main(args: Array[String]): Unit = {
val ss = SparkSession.builder().master("local[*]").appName("app1").getOrCreate()
import ss.implicits._
val filePath = "src/resource"
val schema = Encoders.product[order].schema
val ds = ss.readStream.option("header","true").schema(schema).csv(filePath)
val ds1 = ds.select( lit("1") as "key",to_json(struct("*"))as "value")
val query = ds1.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic","taxi")
.option("checkpointLocation", "/tmp")
.start()
query.awaitTermination()
}
val filePath = "src/resource"
val ss = SparkSession.builder().master("local[*]").appName("app1").getOrCreate()
//ss.conf.set("spark.sql.streaming.schemaInference","true")
import ss.implicits._
val schema = Encoders.product[order].schema
//val ds = ss.readStream.option("header","true").schema(sche1).csv("src/resource")
val dfs = ss.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092")
.option("startingOffsets", "earliest")
//.option("maxOffsetsPerTrigger",1)
.option("subscribe","taxi").load()
val ds = dfs.select(from_json($"value".cast("string"),schema) as "record").select("record.*").as[order]

val perMinCount = ds.filter(col("pickup zone")=== "Union Sq").withWatermark("tpep_pickup_datetime", "10 minutes") .groupBy($"pickup zone",window($"tpep_pickup_datetime", "1 hours","3 minutes")).agg() //.orderBy($"window".desc) val query1 = perMinCount.writeStream.outputMode("update").format("console").option("truncate", false).start()

val perMinCount = ds.filter(col("pickup zone")=== "Union Sq").withWatermark("tpep_pickup_datetime", "10 minutes")
.groupBy($"pickup zone",window($"tpep_pickup_datetime", "1 hours","3 minutes")).agg()
//.orderBy($"window".desc)
val query1 = perMinCount.writeStream.outputMode("update").format("console").option("truncate", false).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment