Skip to content

Instantly share code, notes, and snippets.

@Kiollpt
Last active November 20, 2020 10:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Kiollpt/31a3eb23cc24fe3a4cb063d43e62175e to your computer and use it in GitHub Desktop.
Save Kiollpt/31a3eb23cc24fe3a4cb063d43e62175e to your computer and use it in GitHub Desktop.
#Spark
val ss = SparkSession.builder().master("local[*]").appName("app1").getOrCreate()
import ss.implicits._
val filePath = "src/resource"
val topic = "taxi1"
val cpDir = "/tmp/taxi-harry"
val schema = Encoders.product[order].schema
val ds = ss.readStream.option("header","true").schema(schema).csv(filePath)
val ds1 = ds.select( lit("1") as "key",to_json(struct("*"))as "value")
//val query1 = ds1.writeStream.format("console").outputMode("update").start()
val query = ds1.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic","taxi1")
.option("checkpointLocation", cpDir)
.start()
//query1.awaitTermination()
query.awaitTermination()
val filePath = "src/resource"
val ss = SparkSession.builder().master("local[1]").appName("app1").getOrCreate()
//ss.conf.set("spark.sql.streaming.schemaInference","true")
ss.sparkContext.setLogLevel("WARN")
import ss.implicits._
val schema = Encoders.product[order].schema
//val ds = ss.readStream.option("header","true").schema(sche1).csv("src/resource")
val dfs = ss.readStream.format("kafka").option("kafka.bootstrap.servers","localhost:9092")
.option("startingOffsets", "earliest")
//.option("maxOffsetsPerTrigger",1)
.option("subscribe","taxi1").load()
val ds = dfs.select(from_json($"value".cast("string"),schema) as "record").select("record.*").as[order]
val output = ds.groupByKey(_.`pickup zone`).mapGroupsWithState[State,Result](GroupStateTimeout.NoTimeout()){
case (key:String,values:Iterator[order],state: GroupState[State]) => {
val data = values.toSeq
println(data)
val size = data.size
print()
val updateState = if(state.exists) {
val prvSupply = state.get.supply
val tmp = prvSupply + size
println("update")
State(key,tmp)
}else {
State(key,1)
}
state.update(updateState)
println(s"${values.toSeq.size} and ${state.get.supply} ${size}")
val ratio = state.get.supply.toFloat /size
Result(ratio, state.get.supply, size, key)
}
}
class Harry extends UserDefinedAggregateFunction {
override def inputSchema = StructType(StructField("event",StringType)
::Nil)
override def bufferSchema = StructType(StructField("supply",LongType)::
StructField("demand",LongType)::Nil)
override def dataType: DataType = DoubleType
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
val (supply,demand) = input.getString(0) match
{
case "PICK"=>(1L,1L)
case "DROP"=>(0L,1L)
}
buffer.update(0,supply+buffer.getLong(0))
buffer.update(1,demand+buffer.getLong(1))
}
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
println( buffer1.getAs[Long](0),buffer2.getLong(0),buffer1.getAs[Long](1),buffer2.getAs[Long](1))
buffer1(0) = buffer1.getAs[Long](0)+buffer2.getAs[Long](0)
buffer1(1)= buffer1.getAs[Long](1)+buffer2.getAs[Long](1)
}
override def evaluate(buffer: Row): Any = {
(buffer.getLong(0)+buffer.getLong(1)).toDouble
}
override def deterministic: Boolean = true
}
val perMinCount = ds.withWatermark("tpep_pickup_datetime", "10 minutes").groupBy($"pickup zone",window($"tpep_pickup_datetime", "1 minutes")).count()
val query= perMinCount.writeStream.outputMode("update").format("console").option("truncate", false).start()
query.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment